Overview
Reactive Extensions (Rx) is the hot new toy out of Microsoft's DevLabs, which is included in Silverlight 4 and .NET 4, and is available as a download for Silverlight 3 and .NET 3.5. It provides a new LINQish way of managing and handling events. More information and downloads available at the DevLabs project page.
I'd read quite a bit about Rx but hadn't taken the time to have a play with it until today, when I found myself looking at the code for a Model-View-ViewModel messaging implementation. The MVVM Messaging pattern consists of a static/global class through which separate parts of the application can communicate without requiring knowledge of each other. Classes can register to receive messages from the Messenger/Mediator, or send messages through the Messenger/Mediator for consumption elsewhere.
Hang on, I thought, that's the Rx pattern pair. I wonder if the two sit well together. So I had a go.
The basics
The Messaging Dispatcher pattern implementation, at its simplest, requires a method for registering to receive messages, and a method to send messages. The IObservable<T> interface consists of a single method, Subscribe, which takes an instance of IObserver<T>. The Messenger interface popularly provides a Subscribe method which takes an Action<TMessage> delegate, but that will be provided by the Rx extension methods if we implement IObservable<T>, so let's just include that in the IMessenger interface:
public interface IMessenger : IObservable<IMessage>
{
void Send(IMessage message);
}
The Messenger class can be implemented as a very simple wrapper around the Rx Subject<T> class, which is both an IObservable<T> and an IObserver<T>; essentially, calling OnNext on the Subject causes it to call OnNext on its own observers.
public class Messenger : IMessenger{
private readonly Subject<IMessage> _subject = new Subject<IMessage>();
public IDisposable Subscribe(IObserver<IMessage> observer)
{
return _subject.Subscribe(observer);
}
public void Send(IMessage message)
{
_subject.OnNext(message);
}
private static readonly Messenger _default = new Messenger();
public static Messenger Default
{
get { return _default; }
}
}
That was easy. That can't be it, can it?
Well, we get a lot of stuff for free with Rx. For example, the Subscribe method in the class above takes an instance of a class which implements IObserver<T>, but the System.Reactive.dll assembly provides a fistful of extension methods which accept one or more Action delegates and dynamically construct a class to implement the interface.
Specifying which messages to receive
One of the key features of a Messaging Dispatcher is the ability to register to receive only messages of specific types. I initially looked for an OfType<T> method on IObservable<T> (to match the OfType<T> on IEnumerable) but there is none, possibly because OfType<T> is implemented on the non-generic IEnumerable and there is no non-generic IObservable. This is reasonable enough: if IEnumerable were implemented today I don't suppose there'd be a non-generic version of that, or indeed any of the Collection classes.
But there is still a valid use case for being able to filter an IObservable<T> for more-derived types than T, as in this MVVM Messenger class. We could add an OfType<T> method to the Messenger class directly, but it is both more useful and more fun to apply it directly to the IObservable<T> type as an extension method.
Here we can make use of the Subject<T> primitive again; we subscribe to the IObservable<T> to be filtered, and when it fires an OnNext passing an argument of the requested type, we pass that on to the Subject. I've implemented OfType to include sub-types, and OfTypeExact to match only the specified type.
public static class IObservableExtensions{
/// <summary> /// Filter an IObservable for the specified type or derivatives. /// </summary> public static IObservable<TResult> OfType<TResult>(this IObservable<object> src)
{
return from item in src
where item is TResult
select (TResult)item;
}
}
This code highlights the "LINQ to Events" aspect of the Rx framework; we can construct natural queries against IObservable<T> sources just as we can against IEnumerable<T>.
Consuming messages
Now, any component which needs to react to messages can subscribe very easily, using methods which are already familiar from the IEnumerable paradigm. To use a common example, here is a WPF Window which looks for DisplayMessage messages and displays a Message Box:
public partial class MainWindow : Window{
public MainWindow()
{
InitializeComponent();
MvvmRx.Messaging.Messenger.Default
.OfType<DisplayMessage>()
.Subscribe(ShowMessageBox);
}
private void ShowMessageBox(DisplayMessage message)
{
MessageBox.Show(message.Text, message.Caption);
}
}
You can download the solution file containing the code for this post, as well as a sample application, at http://github.com/markrendle/MvvmRx.