javascript - Using rx.js, how do I emit a memoized result from an existing observable sequence on a timer? -
i'm teaching myself reactive programming rxjs, , i've set myself challenge of creating observable stream emit same result subscriber no matter what.
i've memoized creation of http "get" stream given specific url, , i'm trying act on stream every 2 seconds, outcome being each tick of timer, i'll extract cached/memoized http result original stream.
import superagent 'superagent'; import _ 'lodash'; // cached function, returning stream emits http response object var httpget = _.memoize(function(url) { var req = superagent.get(url); req = req.end.bind(req); return rx.observable.fromnodecallback(req)(); }); // assume created externally , have access response$ var response$ = httpget('/ontologies/acl.ttl'); // every 2 seconds, emit memoized http response rx.observable.timer(0, 2000) .map(() => response$) .flatmap($ => $) .subscribe(response => { console.log('got response!'); }); i sure i'd have stick call replay() in there somewhere, no matter do, fresh http call initiated every 2 seconds. how can structure can construct observable url , have emit same http result subsequent subscribers?
edit
found way result want, feel missing something, , should able refactor more streamlined approach:
var httpget = _.memoize(function(url) { var subject = new rx.replaysubject(); try { superagent.get(url).end((err, res) => { if(err) { subject.onerror(err); } else { subject.onnext(res); subject.oncompleted(); } }); } catch(e) { subject.onerror(e); } return subject.asobservable(); });
your first code sample closer way it
var httpget = _.memoize(function(url) { var req = superagent.get(url); return rx.observable.fromnodecallback(req.end, req)(); }); however, isn't working because there appears bug in fromnodecallback. work around till fixed, think looking asyncsubject instead of replaysubject. latter works, former designed scenario (and doesn't have overhead of array creation + runtime checks cache expiration if matters you).
var httpget = _.memoize(function(url) { var subject = new rx.asyncsubject(); var req = superagent.get(url); rx.observable.fromnodecallback(req.end, req)().subscribe(subject); return subject.asobservable(); }); finally, though map appreciates thinking of it, can simplify timer code using flatmap overload takes observable directly:
rx.observable.timer(0, 2000) .flatmap($response) .subscribe(response => { console.log('got response'); });
Comments
Post a Comment