python - How to process multiple commands to read from socket at the same time in Tornado asynchronous TCP? -
my tcp server made tornado's asynchronous tcp. client written in c.
server code:
#! /usr/bin/env python #coding=utf-8 tornado.tcpserver import tcpserver tornado.ioloop import ioloop class tcpconnection(object): def __init__(self,stream,address): self._stream=stream self._address=address self._stream.set_close_callback(self.on_close) self.send_message(b'hello \n') self.send_message(b'world \n') def read_message(self): self._stream.read_until(b'\n', self.handle_message) def handle_message(self,data): print(data) def send_message(self,data): self._stream.write(data) self.read_message() def on_close(self): print("the monitored %d has left",self._address) class monitorserver(tcpserver): def handle_stream(self,stream,address): print("new connection",address,stream) tcpconnection(stream,address) if __name__=='__main__': print('server start .....') server=monitorserver() server.listen(20000) ioloop.instance().start() client code:
#include <winsock2.h> #include <stdio.h> #pragma comment(lib, "ws2_32.lib") typedef struct syteminit { char computer[32]; char user[32]; char os[256]; char processor[256]; char mem[128]; char disk[128]; }systeminit; typedef struct command { int commandtype; char commandinfo[256]; }command; void main() { int err; systeminit message; command recvbuf; socket sockclient; sockaddr_in addrserver; wsadata wsadata; word wversionrequested; wversionrequested = makeword( 2, 2 ); err = wsastartup( wversionrequested, &wsadata ); if ( err != 0 ) { return; } if ( lobyte( wsadata.wversion ) != 2 || hibyte( wsadata.wversion ) != 2 ) { wsacleanup( ); return; } sockclient = socket(af_inet, sock_stream, 0); addrserver.sin_addr.s_un.s_addr = inet_addr("172.16.110.1"); addrserver.sin_family = af_inet; addrserver.sin_port = htons(20000); connect(sockclient, (sockaddr *)&addrserver, sizeof(sockaddr)); recv(sockclient, (char*)&recvbuf, 100, 0); strcpy(message.computer,"zz-pc"); strcpy(message.disk,"zz-disk"); strcpy(message.mem,"zz-men"); strcpy(message.os,"zz-os"); strcpy(message.processor,"zz-processor"); strcpy(message.user,"zz-user"); send(sockclient, (char*)&message, sizeof(message) + 1, 0); closesocket(sockclient); wsacleanup(); } i following error when execute them:
error:tornado.application:error in connection callback traceback (most recent call last): file "/usr/local/lib/python3.4/dist-packages/tornado/tcpserver.py", line 269, in _handle_connection future = self.handle_stream(stream, address) file "/home/zz/pycharmprojects/monitor/test.py", line 34, in handle_stream tcpconnection(stream,address) file "/home/zz/pycharmprojects/monitor/test.py", line 15, in __init__ self.send_message(b'world \n') file "/home/zz/pycharmprojects/monitor/test.py", line 25, in send_message self.read_message() file "/home/zz/pycharmprojects/monitor/test.py", line 18, in read_message self._stream.read_until(b'\n', self.handle_message) file "/usr/local/lib/python3.4/dist-packages/tornado/iostream.py", line 270, in read_until future = self._set_read_callback(callback) file "/usr/local/lib/python3.4/dist-packages/tornado/iostream.py", line 658, in _set_read_callback assert self._read_callback none, "already reading" assertionerror: reading i guess error because self.send_message(b'hello \n') , self.send_message(b'world \n') read socket @ same time. how can solve this?
this code of yours:
self.send_message(b'hello \n') self.send_message(b'world \n') results in following:
self._stream.write(b'hello \n') self._stream.read_until(b'\n', self.handle_message) self._stream.write(b'world \n') self._stream.read_until(b'\n', self.handle_message) since you're calling read_until callback, you're trying both read_untils in parallel @ same time. that's nonsense, though, because they're coming 1 after other on tcp connection. have first read 1 message, , read other message.
i feel using gen.coroutine make easier. can callbacks; i'll show how later.
using gen.coroutine
here's how i'd change tcpconnection class coroutines:
class tcpconnection(object): def __init__(self,stream,address): self._stream=stream self._address=address self._stream.set_close_callback(self.on_close) @gen.coroutine def send_messages(self): yield self.send_message(b'hello \n') response1 = yield self.read_message() print(response1) yield self.send_message(b'world \n') # can receive result in-line, need wrap ( ): print((yield self.read_message())) def read_message(self): return self._stream.read_until(b'\n') def send_message(self,data): return self._stream.write(data) def on_close(self): print("the monitored %d has left",self._address) class monitorserver(tcpserver): @gen.coroutine def handle_stream(self,stream,address): print("new connection",address,stream) conn = tcpconnection(stream,address) yield conn.send_messages() by using coroutines, can write code in order want execute in, , can read response return value local variable, instead of having use handler method. whenever yield something, you're pausing wait finish.
i separated send_message() , receive_message(), because think makes clearer. if think it's better keep them in send_message(), can this:
@gen.coroutine def send_message(self,data): yield self._stream.write(data) return (yield self.receive_message()) if instead wanted send both messages first, , then wait receive both responses, that:
@gen.coroutine def send_messages(self): yield self.send_message(b'hello \n') yield self.send_message(b'world \n') print((yield self.read_message())) print((yield self.read_message())) using callbacks
anything can code coroutines, can code callbacks. need do, however, keep track of state (where you're at) between callbacks. can done jumping around between different callbacks. example:
def send_first_message(self): self.send_message(b'hello \n', self.receive_first_response) def receive_first_response(self, data): print(data) self.send_message(b'world \n', self.receive_second_response) def receive_second_response(self, data): print(data) def read_message(self, callback): self._stream.read_until(b'\n', callback) def send_message(self, data, callback): self._stream.write(data) self.read_message(callback) or other way of keeping track of in communication, such storing in field of class instance.
Comments
Post a Comment