javascript - Using rx.js, how do I emit a memoized result from an existing observable sequence on a timer? -


i'm teaching myself reactive programming rxjs, , i've set myself challenge of creating observable stream emit same result subscriber no matter what.

i've memoized creation of http "get" stream given specific url, , i'm trying act on stream every 2 seconds, outcome being each tick of timer, i'll extract cached/memoized http result original stream.

import superagent 'superagent'; import _ 'lodash';  // cached function, returning stream emits http response object var httpget = _.memoize(function(url) {   var req = superagent.get(url);   req = req.end.bind(req);   return rx.observable.fromnodecallback(req)(); });  // assume created externally , have access response$ var response$ = httpget('/ontologies/acl.ttl');  // every 2 seconds, emit memoized http response rx.observable.timer(0, 2000)   .map(() => response$)   .flatmap($ => $)   .subscribe(response => {     console.log('got response!');   }); 

i sure i'd have stick call replay() in there somewhere, no matter do, fresh http call initiated every 2 seconds. how can structure can construct observable url , have emit same http result subsequent subscribers?

edit
found way result want, feel missing something, , should able refactor more streamlined approach:

var httpget = _.memoize(function(url) {   var subject = new rx.replaysubject();   try {     superagent.get(url).end((err, res) => {       if(err) {         subject.onerror(err);       }       else {         subject.onnext(res);         subject.oncompleted();       }     });   }   catch(e) {     subject.onerror(e);   }   return subject.asobservable(); }); 

your first code sample closer way it

var httpget = _.memoize(function(url) {   var req = superagent.get(url);   return rx.observable.fromnodecallback(req.end, req)(); }); 

however, isn't working because there appears bug in fromnodecallback. work around till fixed, think looking asyncsubject instead of replaysubject. latter works, former designed scenario (and doesn't have overhead of array creation + runtime checks cache expiration if matters you).

var httpget = _.memoize(function(url) {    var subject = new rx.asyncsubject();   var req = superagent.get(url);   rx.observable.fromnodecallback(req.end, req)().subscribe(subject);   return subject.asobservable();  }); 

finally, though map appreciates thinking of it, can simplify timer code using flatmap overload takes observable directly:

rx.observable.timer(0, 2000)   .flatmap($response)   .subscribe(response => {     console.log('got response');   }); 

Comments