'How to Make a Custom Extension for Reactive Extensions

It is not hard to find an example of how to make a custom LINQ extension method. But I can't find an example of how to make a custom Rx extension method.

Can someone point me to a resource or post an example? I'm using the latest (ver 2.2.5).

I'm interested in making a custom "Cast" or custom "Select". I can reflect the source code of existing extensions, and it is still non-obvious.

Thanks.



Solution 1:[1]

The most common way to implement custom extensions in Rx (other than composing existing operations) is via Observable.Create. It is highly advisable to do it this way since Create takes care of a host of concerns that it is non-trivial to implement yourself.

However you do it, be sure to digest the Rx Design Guidelines which explain the rules and conventions you should to follow - many of which Observable.Create takes care of for you.

Here is an example Cast implementation:

public static class ObservableExtensions
{
    public static IObservable<TResult> Cast<TSource, TResult>(
        this IObservable<TSource> source)
    {
        return Observable.Create<TResult>(o =>
            source.Subscribe(x => {
                TResult cast;
                try
                {
                    cast = (TResult)(object)x;                        
                }
                catch (InvalidCastException ex)
                {
                    o.OnError(ex);
                    return;
                }
                o.OnNext(cast);
            },
            o.OnError,
            o.OnCompleted));
    }
}

I put this as is, because the double cast requirement isn't obvious (actually IEnumerable's Cast does the same thing, for the same reason). You can make this tweak to reduce the type arguments to one, given that the object cast is necessary:

public static class ObservableExtensions
{
    public static IObservable<TResult> Cast<TResult>(
        this IObservable<object> source)
    {
        return Observable.Create<TResult>(o =>
            source.Subscribe(x => {
                TResult cast;
                try
                {
                    cast = (TResult)x;
                }
                catch (InvalidCastException ex)
                {
                    o.OnError(ex);
                    return;
                }
                o.OnNext(cast);
            },
            o.OnError,
            o.OnCompleted));
    }       
}

You can have a look at IntroToRx for some more on Observable.Create too.

Solution 2:[2]

Usually it is possible to create a new Rx operator by combining existing operators. For example the Cast operator could be implemented on top of the built-in Select operator like this:

public static IObservable<TResult> Cast<TSource, TResult>(
    this IObservable<TSource> source)
{
    return source.Select(x => (TResult)(object)x);
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Blue
Solution 2 Theodor Zoulias