c# - ReactiveExtensions: Stop an observable from returning before the tasks it has spun off have finished? -
i'm new rx .net have business scenario think warrants it. however, i'm still having trouble wrapping head around initial design.
the problem
- i have large set of items, let's 600k.
- i have way of pulling these database in batches (let's 1,000 @ shot)
- i run process on these items in parallel, @ x amount @ time (let's 50 @ time)
- when we're done, need know because need spit out additional stats , ensure long-running process returns.
this seems ideal reactive extensions -- have:
- something feeding list on time
- a set of things want items come in
- a need handle errors
- a need handle completions.
where i'm getting started
this seems have list of items observable, , "looping" process gets items database "push" them observable, , subscription observable take over.
where i'm getting stuck
- i'm little unsure of syntax
- i'm little unsure of how handle x degrees of parallelism limit
- i'm unsure how i'll know when hit completion. loop that's pulling database call "oncomplete()" instead of "onnext" @ point?
i'm hoping can me break down, conceptually, i'm looking can better wrap head around it. thanks!
code v3 -- better method still exits quickly.
this starting feel better, know it's not quite ther.e
public override async task processasync(dataloadrequest dataloadrequest, func<string, task> createtrackingpayload) { _requestparameters = deserialize<schooletlrequestparameters>(dataloadrequest.dataextractorparams); wireupdependencies(); //this new retriever allows records "paged" (e.g. returns empty list pagenum > 0 on ones don't have paging.) _recordstoprocessretriever = new settingbasedrecordsretriever(_propertyrepository, _requestparameters.runtype, _requestparameters.residentialprofileidoverrides, _processorsettings.maxbatchestoprocess, _etllogger); var query = observable.range(0, int.maxvalue) .select(pagenum => _recordstoprocessretriever.getresprofidstoprocess(pagenum, _processorsettings.batchsize)) .takewhile(resproflist => resproflist.any()) .selectmany(records => records) .select(resprof => observable.start(() => task.run(()=> _schooldataprocessor.processschoolsasync(resprof)).result)) .merge(maxconcurrent: _processorsettings.parallelproperties); var subscription = query.subscribe(async trackingrequests => { await createrequests(trackingrequests, createtrackingpayload); var numberofattachments = sumofrequesttype(trackingrequests, trackingrecordrequesttype.attachschool); var numberofdetachments = sumofrequesttype(trackingrequests, trackingrecordrequesttype.detachschool); var numberofassignmenttypeupdates = sumofrequesttype(trackingrequests, trackingrecordrequesttype.updateassignmenttype); _etllogger.info("extractor generated {0} attachments, {1} detachments, , {2} assignment type changes.", numberofattachments, numberofdetachments, numberofassignmenttypeupdates); }, () => { _etllogger.info("finished! woohoo!"); }); } problems v3
- the processasync method still finishes before of items have processed in background. i'd fine that, in our case, framework i'm using needs wait until of tracking requests have been created (e.g. until
createtrackingrequestshas been called each batch of results).
is possible await operations being completed within this?
update: additional information problem
in case, don't know going produce observables until run-time. app passed in command, amounts either:
- "new records": hits method returns results of specific sproc
- "specific record": testing; hits method hits separate sproc specific given values
- "all records": hits method goes continuous paging loop, looping through 600k records in pages of x (defined setting).
the first 2 scenarios sound pass them right observable no problem. however, last 1 seems i'd have loop through bunch of sets of observables in case, isn't behavior want (i want 600k items end in large queue , processed 50 @ time).
my hope have 1 method "throws things on queue", , have processing task continuously pull in batches of 50.
a note: methods call sprocs return exact same thing -- list of ithing (obfuscated out of necessity).
i've wired of repository functions, etc. processor dependencies, calling processstuffformything(list<ithing>) takes care of whole process, , works fine in parallel using same object (no need new each time).
you have number of issues code should fix. mistakes you're making i've seen many times - seems go down same path. boils down changing thinking procedural functional.
to start with, rx has lot of operators designed make life easier. 1 of them observable.using. it's job spin disposable resource, build observable, , dispose of resource when observable completes. perfect reading records database.
your code seems have open database connection , you're pumping records out via subject. should avoid having external state (the data processor) , should avoid using subjects. there's observable operator can use.
the other thing you're doing shouldn't mixing monads - or more observables , tasks. there operators in rx turning tasks observables, there interfacing existing code , shouldn't used tool in observables. rul try observable , stay there until ready subscribe data.
i felt code little fragmented understand being called where, wrote general purpose bit of code think covers off on need. here's query:
var pagesize = 4; func<record, result> process = r => { thread.sleep(100); // here demonstrate parallelism return new result(r.id); }; var query = observable .using( () => new dataprocessor(), dc => observable .range(0, int.maxvalue) .select(n => dc.getrecords(n, pagesize)) .takewhile(rs => rs.any()) .selectmany(rs => rs) .select(r => observable.start(() => process(r))) .merge(maxconcurrent: 4)); var subscription = query .subscribe( r => console.writeline(r.id), () => console.writeline("done.")); i've taken shortcuts code, in essence same (i hope).
this code runnable if add in following classes:
public class dataprocessor : idisposable { public dataprocessor() { console.writeline("opened."); } public void dispose() { console.writeline("closed."); } public ienumerable<record> getrecords(int page, int count) { console.writeline("reading."); thread.sleep(100); var records = page <= 5 ? enumerable .range(0, count < 5 ? count : count / 2) .select(x => new record()) .toarray() : new record[] { }; console.writeline("read."); return records; } } public class record { private static int __counter = 0; public record() { this.id = __counter++; } public int id { get; private set; } } public class result { public result(int id) { this.id = id; } public int id { get; private set; } } when run result:
opened. reading. read. reading. 0 2 3 1 read. reading. 7 read. 5 6 4 reading. 10 11 9 8 read. reading. 15 12 read. 14 reading. 13 17 19 18 16 read. reading. 21 read. 20 22 23 done. closed. you can see being processed in parallel. can see observable completing. can see database opening, , closing once observable done.
let me know if helps.
Comments
Post a Comment