Have you heard about IObservable/IObserver support in Microsoft StreamInsight 1.1? Then you probably want to try it out. If this is your first incursion into the IObservable/IObserver pattern, this blog post is for you! StreamInsight 1.1 introduced the ability to use IEnumerable and IObservable objects as event sources and sinks. The IEnumerable case is pretty straightforward, since many data collections are already surfacing as this type. This was already covered by Colin in his blog. Creating your own IObservable event source is a little more involved but no less exciting – here is a primer: First, let’s look at a very simple Observable data source. All it does is publish an integer in regular time periods to its registered observers. (For more information on IObservable, see http://msdn.microsoft.com/en-us/library/dd990377.aspx ). sealed class RandomSubject : IObservable<int>, IDisposable
{
private bool _done;
private readonly List<IObserver<int>> _observers;
private readonly Random _random;
private readonly object _sync;
private readonly Timer _timer;
private readonly int _timerPeriod; /// <summary>
/// Random observable subject. It produces an integer in regular time periods.
/// </summary>
/// <param name="timerPeriod">Timer period (in milliseconds)</param>
public RandomSubject(int timerPeriod)
{
_done = false;
_observers = new List<IObserver<int>>();
_random = new Random();
_sync = new object();
_timer = new Timer(EmitRandomValue);
_timerPeriod = timerPeriod;
Schedule();
} public IDisposable Subscribe(IObserver<int> observer)
{
lock (_sync)
{
_observers.Add(observer);
}
return new Subscription(this, observer);
} public void OnNext(int value)
{
lock (_sync)
Read more: Microsoft StreamInsight
{
private bool _done;
private readonly List<IObserver<int>> _observers;
private readonly Random _random;
private readonly object _sync;
private readonly Timer _timer;
private readonly int _timerPeriod; /// <summary>
/// Random observable subject. It produces an integer in regular time periods.
/// </summary>
/// <param name="timerPeriod">Timer period (in milliseconds)</param>
public RandomSubject(int timerPeriod)
{
_done = false;
_observers = new List<IObserver<int>>();
_random = new Random();
_sync = new object();
_timer = new Timer(EmitRandomValue);
_timerPeriod = timerPeriod;
Schedule();
} public IDisposable Subscribe(IObserver<int> observer)
{
lock (_sync)
{
_observers.Add(observer);
}
return new Subscription(this, observer);
} public void OnNext(int value)
{
lock (_sync)
Read more: Microsoft StreamInsight
0 comments:
Post a Comment