Asbestos Supply

2010-01-28 Rx Framework Subscriber Actions are blocking

The NY ALT.NET Meetup group met last night and Scott Weinstein delivered a terrific presentation on the Reactive Framework (also known as Rx Framework).  If you haven't heard about it, the Rx Framework is a framework for composing (and consuming) asynchronous events.

The way that the framework works is that you subscribe to an IObservable and get notified of new events.  Since this is all done async, one of my questions was whether a single subscriber's consumption of these events is blocking (to itself) or not.  That's probably not the best way to phrase the question, but the code's easy enough to understand.  Anyway, Scott said he thought so, but it would be worth testing, so I did.

static void Main(string[] args)
{
    var range = Observable.Range(1, 5).Subscribe((int i) => {
        DateTime future = DateTime.Now.AddSeconds(5);
        DateTime lastPrint = DateTime.MinValue;
        if (i % 2 == 0)
        {
            while (DateTime.Now < future)
            {
                if (DateTime.Now.Subtract(lastPrint).Milliseconds >= 900)
                {
                    Console.WriteLine("Sleeping...");
                    lastPrint = DateTime.Now;
                }
            }
        }
        Console.WriteLine("Sleeper {0}", i);
    });

    Console.ReadLine();
}

Basically, my IObservable is counting from 1 to 5 but taking it's time on 2 and 4.  So the question was, will the Action passed to the subscribe be called in order (each one blocks) or will we have some (like 3 and 5) called before others (like 2 and 4, which means they don't block).

And the output shows that they block, which is what Scott (and I) expected since it really is the only way that makes sense, but it was worth a look-see.

Sleeper 1
Sleeping...
Sleeping...
Sleeping...
Sleeping...
Sleeping...
Sleeping...
Sleeper 2
Sleeper 3
Sleeping...
Sleeping...
Sleeping...
Sleeping...
Sleeping...
Sleeper 4
Sleeper 5

In case you were worried though, other subscribers are not blocked just because one subscriber blocks itself.

static void Main(string[] args)
{
    var range = Observable.Range(1, 5);
    range.Subscribe((int i) =>
    {
        DateTime future = DateTime.Now.AddSeconds(5);
        DateTime lastPrint = DateTime.MinValue;
        if (i % 2 == 0)
        {
            while (DateTime.Now < future)
            {
                if (DateTime.Now.Subtract(lastPrint).Milliseconds >= 900)
                {
                    Console.WriteLine("Sleeping...");
                    lastPrint = DateTime.Now;
                }
            }
        }
        Console.WriteLine("Sleeper {0}", i);
    });
    range.Subscribe((int i) => { Console.WriteLine(i); });
    Console.ReadLine();
}

renders (and this obviously isn't deterministic because this is all async, so it may be slightly different order every run, but it proves the point):

1
Sleeper 1
2
3
4
5
Sleeping...
Sleeping...
Sleeping...
Sleeping...
Sleeping...
Sleeping...
Sleeper 2
Sleeper 3
Sleeping...
Sleeping...
Sleeping...
Sleeping...
Sleeping...
Sleeping...
Sleeper 4
Sleeper 5