i'm trying write asynchronous consumer using asyncio/aioamqp. problem is, callback coroutine (below) blocking. set channel basic_consume(), , assign callback callback(). callback has "yield asyncio.sleep" statement (to simulate "work"), takes integer publisher , sleeps amount of time before printing message.
if published 2 messages, 1 time of "10", followed 1 time of "1", expected second message print first, since has shorter sleep time. instead, callback blocks 10 seconds, prints first message, , prints second.
it appears either basic_consume, or callback, blocking somewhere. there way handled?
@asyncio.coroutine def callback(body, envelope, properties): yield asyncio.sleep(int(body)) print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag)) @asyncio.coroutine def receive_log(): try: transport, protocol = yield aioamqp.connect('localhost', 5672, login="login", password="password") except: print("closed connections") return channel = yield protocol.channel() exchange_name = 'cloudstack-events' exchange_name = 'test-async-exchange' queue_name = 'async-queue-%s' % random.randint(0, 10000) yield channel.exchange(exchange_name, 'topic', auto_delete=true, passive=false, durable=false) yield asyncio.wait_for(channel.queue(queue_name, durable=false, auto_delete=true), timeout=10) binding_keys = ['mykey'] binding_key in binding_keys: print("binding", binding_key) yield asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name, queue_name=queue_name, routing_key=binding_key), timeout=10) print(' [*] waiting logs. exit press ctrl+c') yield channel.basic_consume(queue_name, callback=callback) loop = asyncio.get_event_loop() loop.create_task(receive_log()) loop.run_forever()
for interested, figured out way this. i'm not sure if it's best practice, it's accomplishing need.
rather "work" (in case, async.sleep) inside callback, create new task on loop, , schedule separate co-routine run do_work(). presumably working, because it's freeing callback() return immediately.
i loaded few hundred events in rabbit different sleep timers, , interleaved when printed code below. seems working. hope helps someone!
@asyncio.coroutine def do_work(envelope, body): yield asyncio.sleep(int(body)) print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag)) @asyncio.coroutine def callback(body, envelope, properties): loop = asyncio.get_event_loop() loop.create_task(do_work(envelope, body)) @asyncio.coroutine def receive_log(): try: transport, protocol = yield aioamqp.connect('localhost', 5672, login="login", password="password") except: print("closed connections") return channel = yield protocol.channel() exchange_name = 'cloudstack-events' exchange_name = 'test-async-exchange' queue_name = 'async-queue-%s' % random.randint(0, 10000) yield channel.exchange(exchange_name, 'topic', auto_delete=true, passive=false, durable=false) yield asyncio.wait_for(channel.queue(queue_name, durable=false, auto_delete=true), timeout=10) binding_keys = ['mykey'] binding_key in binding_keys: print("binding", binding_key) yield asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name, queue_name=queue_name, routing_key=binding_key), timeout=10) print(' [*] waiting logs. exit press ctrl+c') yield channel.basic_consume(queue_name, callback=callback) loop = asyncio.get_event_loop() loop.create_task(receive_log()) loop.run_forever()
Comments
Post a Comment