python - Asynchronous RabbitMQ consumer with aioamqp -


i'm trying write asynchronous consumer using asyncio/aioamqp. problem is, callback coroutine (below) blocking. set channel basic_consume(), , assign callback callback(). callback has "yield asyncio.sleep" statement (to simulate "work"), takes integer publisher , sleeps amount of time before printing message.

if published 2 messages, 1 time of "10", followed 1 time of "1", expected second message print first, since has shorter sleep time. instead, callback blocks 10 seconds, prints first message, , prints second.

it appears either basic_consume, or callback, blocking somewhere. there way handled?

@asyncio.coroutine def callback(body, envelope, properties):     yield asyncio.sleep(int(body))     print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag))  @asyncio.coroutine def receive_log():     try:         transport, protocol = yield aioamqp.connect('localhost', 5672, login="login", password="password")     except:         print("closed connections")         return      channel = yield protocol.channel()     exchange_name = 'cloudstack-events'     exchange_name = 'test-async-exchange'     queue_name = 'async-queue-%s' % random.randint(0, 10000)     yield channel.exchange(exchange_name, 'topic', auto_delete=true, passive=false, durable=false)     yield asyncio.wait_for(channel.queue(queue_name, durable=false, auto_delete=true), timeout=10)      binding_keys = ['mykey']      binding_key in binding_keys:         print("binding", binding_key)         yield asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name,                                                        queue_name=queue_name,                                                        routing_key=binding_key), timeout=10)      print(' [*] waiting logs. exit press ctrl+c')     yield channel.basic_consume(queue_name, callback=callback)  loop = asyncio.get_event_loop() loop.create_task(receive_log()) loop.run_forever() 

for interested, figured out way this. i'm not sure if it's best practice, it's accomplishing need.

rather "work" (in case, async.sleep) inside callback, create new task on loop, , schedule separate co-routine run do_work(). presumably working, because it's freeing callback() return immediately.

i loaded few hundred events in rabbit different sleep timers, , interleaved when printed code below. seems working. hope helps someone!

@asyncio.coroutine def do_work(envelope, body):     yield asyncio.sleep(int(body))     print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag))  @asyncio.coroutine def callback(body, envelope, properties):     loop = asyncio.get_event_loop()     loop.create_task(do_work(envelope, body))  @asyncio.coroutine def receive_log():     try:         transport, protocol = yield aioamqp.connect('localhost', 5672, login="login", password="password")     except:         print("closed connections")         return      channel = yield protocol.channel()     exchange_name = 'cloudstack-events'     exchange_name = 'test-async-exchange'     queue_name = 'async-queue-%s' % random.randint(0, 10000)     yield channel.exchange(exchange_name, 'topic', auto_delete=true, passive=false, durable=false)     yield asyncio.wait_for(channel.queue(queue_name, durable=false, auto_delete=true), timeout=10)      binding_keys = ['mykey']      binding_key in binding_keys:         print("binding", binding_key)         yield asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name,                                                        queue_name=queue_name,                                                        routing_key=binding_key), timeout=10)      print(' [*] waiting logs. exit press ctrl+c')     yield channel.basic_consume(queue_name, callback=callback)  loop = asyncio.get_event_loop() loop.create_task(receive_log()) loop.run_forever() 

Comments