...
...
public class CustomerReactiveRepository {
public IObservable<Customer> GetAll()
{
return Observable.Create<Customer>(observer => OnSubscribe(observer));
}
private static Action OnSubscribe(IObserver<Customer> observer)
{
try {
var client = new CustomerServiceClient();
client.CountCompleted += (sender, e) =>
{
if (e.Result > 1000)
{
var state = new GetState { Count = e.Result, Offset = 0, Step = 500 };
((CustomerServiceClient)sender).GetAsync(state.Offset, state.Step, state);
}
else ((CustomerServiceClient)sender).GetAsync(0, e.Result);
};
client.GetCompleted += (sender, e) =>
{
foreach (var c in e.Result)
observer.OnNext(c);
var state = e.UserState as GetState;
if (state != null && state.Offset + state.Step < state.Count)
{
state.Offset += state.Step;
((CustomerServiceClient)sender).GetAsync(state.Offset, state.Step,
state);
}
else {
((CustomerServiceClient)sender).CloseAsync();
observer.OnCompleted();
}
};
client.CountAsync();
}
catch (Exception e)
{
observer.OnError(e);
}
return () => { };
}
private class GetState {
public int Offset { get; set; }
public int Step { get; set; }
public int Count { get; set; }
}
}