scala - How to implement receiveAvailable transducer in scalaz-stream -


short version:

i implement function returns transducer waits block of values "emitted".

the function have in mind have following signature:

/**  * `process1` awaits next "effect" occur , passes values emitted  * effect `rcv` determine next state.  */ def receiveblock[i, o](rcv: vector[i] => process1[i,o]): process1[i,o] = ??? 

details:

my understanding use function implement following function think quite useful:

/**   * groups inputs chunks of dynamic size based on various effects   * emitted values.   *   * @example {{{   * val numbertask = task.delay(1)   * val listofnumberstask = task.delay(list(5,6,7))   * val sample = process.eval(numbertask) ++ process(2,3,4) ++ process.await(listofnumberstask)(xs => process.emitall(xs))   * sample.chunkbyeffect.runlog.run should list(vector(1), vector(2,3,4), vector(5,6,7))   * }}}   */   def chunkbyeffect[i]: process1[i, vector[i]] = {     receiveblock(vec => emit(vec) ++ chunkbyeffect)   } 

[update] more details

my ultimate objective (slightly simplified) implement following function:

/**  * transforms stream of audio stream of text.  */ voicerecognition(audio: process[task, byte]): process[task, string] 

the function makes external call voice recognition service. unreasonable make network call every single byte in stream. need chunk bytes before making network call. make audio process[task, bytevector] require testing code know maximum chunk size function supports, rather managed function itself. also, when service being used inside of service, service receiving network calls given size of audio, chunkxxx function smart chunking not hold onto data available.

basically, stream of audio coming network have form process[task, bytevector] , translated process[task, byte] flatmap(process.emitall(_)). however, test code directly produce process[task, byte] , feed voicerecognition. in theory, believe should possible given appropriate combinator provide implementation of voicerecognition right thing both these streams , think chunkbyeffect function described above key that. realize need chunkbyeffect function have min , max parameter specifies minimum , maximum size of chunking irrespective of underlying task producing bytes.

you need have bytes separated somehow. suggest work higher level abstraction on stream of bytes, i.e. bytevector.

then have perhaps manual process1, implemented process1.chunkby operates on bytevector. i.e.

def chunkby(separator:bytevector): process1[bytevector, bytevector] = {   def go(acc: bytevector): process1[bytevector, bytevector] =     receive1or[bytevector,bytevector](emit(acc)) { =>        // implement searching of separator in accumulated + new bytes        ???     }   go(bytevector.empty) } 

then hook together

val speech: process[task,bytevector] = ??? def chunkbywhatever: process1[bytevector,bytevector] = ???  val recognizer: channel[task,bytevector,string] = ???  //this shall trick speech.pipe(chunkbywhatever).through(recognizer) 

Comments