i pulling stock data down api have access to. doing in following steps:
- loop through list of symbols/stocks 1 one
- create socket connection , send relevant message api
- receive data , separate lines until "!endmsg!" received @ point data symbol complete
- convert data (string) stringio, read pandas dataframe , write data sql
- do next symbol/stock
relevant code snippet:
def readlines(sock, recv_buffer=4096, delim='\n'): buffer = '' while true: data = sock.recv(recv_buffer) buffer += str(data.decode('latin-1')) while buffer.find(delim) != -1: line, buffer = buffer.split('\n', 1) yield line def main(): syms = ['msft', 'aapl', 'gs', 'f'] sym in syms: sock = socket.socket(socket.af_inet, socket.sock_stream) sock.connect((host, port)) data = '' message = sym + #relevant api specific commands sock.sendall(message.encode()) line in readlines(sock): if "!endmsg!" in line: break data += line + '\n' sock.close() data = io.stringio(data) df = pd.read_csv(data) df.to_sql(...) i incorporate threading don't have 1 stock @ time. im not sure of where/how implement locks don't risk getting data incorrect stocks incorrect variables etc
this have far:
import threading queue import queue q = queue() my_lock = threading.lock() def readlines(sock, recv_buffer=4096, delim='\n'): buffer = '' while true: data = sock.recv(recv_buffer) buffer += str(data.decode('latin-1')) while buffer.find(delim) != -1: line, buffer = buffer.split('\n', 1) yield line def get_symbol_data(sym, sock): my_lock: data = '' message = sym + #relevant api specific commands sock.sendall(message.encode()) line in readlines(sock): if "!endmsg!" in line: break data += line + '\n' data = io.stringio(data) df = pd.read_csv(data) df.to_sql(...) def threader(): while true: sym_tuple = q.get() sym = sym_tuple[0] sock = sym_tuple[1] get_symbol_data(sym, sock) q.task_done() def main(): sock = socket.socket(socket.af_inet, socket.sock_stream) sock.connect((host, port)) # create 4 threads x in range(4): t = threading.thread(target=threader) t.daemon = true t.start() syms = ['msft', 'aapl', 'gs', 'f'] sym in syms: q.put((sym, sock)) q.join() sock.close() my attempt @ incorporating threading hangs. no errors, nothing. hangs. can point me in right direction.
im not sure if im using lock @ right place? btw, if not use lock, program still hangs. presumably should still work if data jumbled because of not using locks?
here 2*[small unit of currency]:
- what lock supposed do? each thread has wait lock before receiving data. not efficient since network operation thing benefit being parallelized.
- create socket in each thread. way, don't need synchronize access socket , maybe rid of locks completely. alternatively, use socket pool.
- i'm not sure how storing data, might need synchronization between writers when updating pandas data frame. mention sql - database takes care of you. option have api/socket readers report data second type of thread (or main thread) collects/writes data storage.
all of above assuming network operations reason why want parallelize in first place. mentioned in comment reuse socket symbols. don't know how api works, seems me require symbols collected serially.
Comments
Post a Comment