'how to gracefully combine two Observables inside RXJS pipe
I'm building a log viewer with Angular.
When a user enters the page I need to load historical logs and start watching for new ones.
User can filter logs with a simple form that emits a query
object. Each time the query
changes process is restarted (i.e. "old" results are evicted, new history is loaded and new "live" stream is started).
I can do it in two ways but I'm not happy with either.
The first one I find easier to understand but violates DRY principle.
const historicalLogs = this.querySubject
.pipe(
debounceTime(250),
tap(() => this.logs = []),
switchMap(
query => this.deviceLogsService.getLogsBefore(query, moment())
)
);
const futureLogs = this.querySubject
.pipe(
debounceTime(250),
tap(() => this.logs = []),
switchMap(
query => timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
)
);
merge(historicalLogs, futureLogs)
.subscribe(newLogs => {
this.logs.push(...newLogs);
this.scrollToVeryBottom();
});
The second doesn't violate DRY but I'm afraid that it will be hard to read/analyze in the future:
this.querySubject
.pipe(
debounceTime(250),
tap(() => this.logs = []),
switchMap(query => concat([
this.deviceLogsService.getLogsBefore(query, moment()),
timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
]).pipe(mergeAll()))
)
.subscribe(newLogs => {
this.logs.push(...newLogs);
this.scrollToVeryBottom();
});
I'll appreciate any suggestions on implementing it in a more readable and elegant way.
Solution 1:[1]
One way to simplify, is to create a couple functions that get your future and past logs:
private getHistoricalLogs(query) {
return this.deviceLogsService.getLogsBefore(query, moment());
}
private pollForFutureLogs(query) {
return timer(1000, 2000).pipe(
switchMap(() => this.deviceLogsService.getLogsAfter(query, moment()))
);
}
this.querySubject.pipe(
debounceTime(250),
tap(() => this.logs = []),
switchMap(query => concat([
this.getHistoricalLogs(query),
this.pollForFutureLogs(query)
]))
.subscribe(newLogs => {
this.logs.push(...newLogs);
this.scrollToVeryBottom();
});
Instead of maintaining a separate logs
variable "outside the stream", you could simply emit an empty array initially inside your switchMap
and accumulate the emissions into a single array using scan
:
logs = this.querySubject.pipe(
debounceTime(250),
switchMap(query => concat([
of([]),
this.getHistoricalLogs(query),
this.pollForFutureLogs(query)
])),
scan((all, logs) => all.concat(logs), [])
);
logs.subscribe(() => this.scrollToVeryBottom());
Solution 2:[2]
Have you tried using combineLatest
?:
obs$.pipe(
...,
switchMap(query => combineLatest([
this.deviceLogsService.getLogsBefore(query, moment()),
timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
])),
map(([logsBefore, logsAfter]) => {...})
)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | BizzyBob |
Solution 2 | Some random IT boy |