'RxJS pipe to filter into 2+branches

Let's assume we have a Subject, that can return a given set of values, all of which might require a different approach to them. We could debate whether or not that should ever be the case BUT not quite the point of my inquiry.

So...to handle that we might have our code look something like:

var sub = new Subject(); 

sub.subscribe( 
    data => {
        if (caseA) {
            // do something
        }
        if (caseB) {
            // do something else
        }
     });

which is all swell and great and all...but I was wondering if there's some operator that we can chain to make it more rx-ish? Thought of filter, but chaining caseA and caseB will just make it not work (obviously) since it ends up filtering both out.

So, my question boils down to this: is it possible to have anything remotely resembling the bellow pseudocode? Any operator(s) you know that would work like this?

var sub = new Subject(); 

sub.pipe(
    magicOperator(//handles caseA),
    magicOperator(//handles caseB),
)
subscribe( 
    data => {
        // handles common thread
     });


Solution 1:[1]

I think a more rxjs way to solve this problem is similar to what @ritaj suggested, but by using a static merge operator to the partitioned streams ( or concat if you want to process them sequentially )

let sub = new Subject();
// i'm using typescript syntax, but you can refer to them by index
let [leftPartition, rightPartition] = sub.pipe( partition( [predicate] ) )
// apply custom logic to the left and right partition
rightPartition = rightPartition.pipe( map( x => x + 1 ) )
leftPartition = leftPartition.pipe( map( x => x - 1 ) )

merge(
     leftPartition,
     rightPartition
).pipe(
    tap( 
        ( value ) => {
            // this stream consumes both right and left partition outputs
        }
    )
).subscribe();

Partition Operator
Merge ( static )

The static merge version is imported from the rxjs package, and accept any number of arguments ( and you can even pass array arguments using spread )

I reassigned the observable to the same variable after applying the map operator. I don't know if it is a good practice or not, but can be improved or separated to a different variable if you need the source intact )

Ps. I usually try to keep the subscription callback free from any implementation, since most of the stuff you do at the end of the stream is a side effect, I put them into a tap operator ( same arguments as subscribe )

Solution 2:[2]

just create an object with functions for case switching can make code prettier

const handler={
  case1:()=>..handle case1,
  case2:()=>...handle case2
}

sub.map(result=>handler[result])

// experimental switchCase operator 
const switchCase=handlers=>(source)=>source.map(val=>handlers[val]())

// usage
sub.pipe(switchCase({
   case1:....,
   case2:....
}))

Solution 3:[3]

sub.pipe(
    magicOperator(//handles caseA),
    magicOperator(//handles caseB),
)

You can get something very close to this style by doing some gymnastics with custom pipeable operators, aka OperatorFunctions. They can achieve something like this, allowing you to easily disperse an Observable laterally:

sub.pipe(
doAllTheMagic([magicOperator(/*handles caseA*/),magicOperator(/*handles caseB*/)])
)

For example, if I have a function like this:

function hiFromDad(): OperatorFunction<string, string> {
  return (source: Observable<string>) =>
    source.pipe(map((name) => `Hi ${name}, I'm Dad`));
}

This can be piped directly using Observable.pipe:

of('World')
  .pipe(hiFromDad())
  .subscribe(console.log);

If I want to use it with a buddy:

function hiFromBob(): OperatorFunction<string, string> {
  return (source: Observable<string>) =>
    source.pipe(map((name) => `Hi ${name}, I'm Bob`));
}

I can send the same stream to both, letting both of them emit as necessary:

function hiFromAll(
  hiFromOps: Array<OperatorFunction<string, string>>
): OperatorFunction<string, string> {
  return (source: Observable<string>) =>
    merge(...hiFromOps.map((op) => source.pipe(op)));
}

...

of('World')
  .pipe(hiFromAll([hiFromBob(), hiFromDad()]))
  .subscribe(console.log);

A teensy bit more about the OperatorFunction: In Typescript, you can describe its input and output. In the examples above, the OperatorFunctions all receive a string, and emit a string. In case you are like me and have trouble understanding documentation, a general description of a function returning an OperatorFunction would be:

function makeAPipeable([args]): OperatorFunction<T,U> {
return (source: T) => [output a U object]
}

See this Stackblitz.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2
Solution 3 Joseph Zabinski