'Pausable buffer with RxJS

I'm trying to implement a togglable auto-save feature using RxJS streams. The goal is to:

  • While auto-save is enabled, send changes to the server as they come.
  • While auto-save is disabled, buffer the changes and send them to the server when auto-save is re-enabled.

Here is what I came across with:

autoSave$ = new BehaviorSubject(true);
change$ = new Subject();

change$.pipe(
  bufferToggle(
    autoSave$.pipe(filter(autoSave => autoSave === false)),
    () => autoSave$.pipe(filter(autoSave => autoSave === true)),
  ),
  concatMap(changes => changes),
  concatMap(change => apiService.patch(change)),
).subscribe(
  () => console.log('Change sent'),
  (error) => console.error(error),
);

Thanks to bufferToggle, I'm able to buffer the changes while autoSave is off and send them when it's re-enabled.

Problem is that while autoSave is enabled, nothing passes through. I understand it's because bufferToggle ignores the flow coming while its opening observable doesn't emit.

I feel that I should have a condition there to bypass the bufferToggle while autoSave is enabled, but all my attempts miserably failed.

Any idea to achieve this?



Solution 1:[1]

We can buffer events in-between autosave on and off using bufferToggle(on, off), and open a filtering window between off and on using windowToggle(off, on). And then we merge those together:

pausable buffer with bufferToggle and windowToggle

const on$ = autoSave$.filter(v=>v);
const off$ = autoSave$.filter(v=>!v);

const output$ =
  Observable.merge(
    changes$
      .bufferToggle(
        off$,
        ()=>on$
      )

    changes$
      .windowToggle(
        on$,
        ()=>off$
      )
  )
  .flatMap(x=>x) // < flattern buffer and window

Play with this example at https://thinkrx.io/gist/3d5161fc29b8b48194f54032fb6d2363

* Please, note that since buffer wraps values in Array — I've used another flatMap(v=>v) in the example to unwrap buffered values. You might want to disable this particular line to get arrays from buffers mixed with raw values.

Also, check my article "Pausable Observables in RxJS" to see more examples.

Hope this helps

Solution 2:[2]

Another solution. Just one observable to play / pause

export type PauseableOptions = 'paused' | 'playing'
export function pauseableBuffered(pauser$: Observable<PauseableOptions>) {
  return function _pauseableBuffer<T>(source$: Observable<T>): Observable<T> {
    let initialValue = 'paused'
    // if a value is already present (say a behaviour subject use that value as the initial value)
    const sub = pauser$.subscribe(v => initialValue = v)
    sub.unsubscribe()

    const _pauser$ = pauser$.pipe(startWith(initialValue), distinctUntilChanged(), shareReplay(1))

    const paused$ = _pauser$.pipe(filter((v) => v === 'paused'))
    const playing$ = _pauser$.pipe(filter((v) => v === 'playing'))

    const buffer$ = source$.pipe(bufferToggle(paused$, () => playing$))

    const playingStream$ = source$
      .pipe(
        withLatestFrom(_pauser$),
        filter(([_, state]) => state === 'playing'),
        map(([v]) => v)
      )

    return merge(
      buffer$.pipe(
        mergeMap(v => v)
      ),
      playingStream$
    )
  }
}
const stream$ = new Subject<number>()
const playPause$ = new BehaviorSubject<PauseableOptions>('playing')

const result: number[] = []

const sub = stream$.pipe(pauseableBuffered(playPause$))
 .subscribe((v) => result.push(v))

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Cameron