python - Fastest and most efficient way to insert and update a lot of rows in cassandra using cqlengine -
i have csv file 15000000 records i'm trying process cassandra table. here's example of column headers , data:

to make more sense of it, here model in python:
class didsummary(model): __keyspace__ = 'processor_api' did = columns.text(required=true, primary_key=true, partition_key=true) month = columns.datetime(required=true, primary_key=true, partition_key=true) direction = columns.text(required=true, primary_key=true) duration = columns.counter(required=true) cost = columns.counter(required=true) right i'm trying process data in each row of csv file , insert them in batches of 500, 1000, 10000, 250, etc. same time results (about .33 seconds per 1000, means take 90 minutes through of them). tried taking multiprocessing pool , apply_async()'ing each batch.execute() call, no better results. there way could, in python, use sstablewriter, or else insert them cassandra better? reference, here's process_sheet_row() method:
def process_sheet_row(self, row, batch): report_datetime = '{0}{1:02d}'.format(self.report.report_year, self.report.report_month) duration = int(float(row[self.columns['duration']]) * 10) cost = int(float(row[self.columns['cost']]) * 100000) anisummary = didsummary.batch(batch).create(did='{}{}'.format(self.report.ani_country_code, row[self.columns['ani']]), direction='from', month=datetime.datetime.strptime(report_datetime, '%y%m')) anisummary.duration += duration anisummary.cost += cost anisummary.batch(batch).save() destsummary = didsummary.batch(batch).create(did='{}{}'.format(self.report.dest_country_code, row[self.columns['dest']]), direction='to', month=datetime.datetime.strptime(report_datetime, '%y%m')) destsummary.duration += duration destsummary.cost += cost destsummary.batch(batch).save() any appreciated. thanks!
edit: here code going through file , processing it:
with open(self.path) csvfile: reader = csv.dictreader(csvfile) if arr[0] == 'inventory': self.parse_inventory(reader) b = batchquery(batch_type=batchtype.unlogged) = 1 row in reader: self.parse_sheet_row(row, b) if not % 1000: connection.check_connection() # makes sure we're still connected cassandra. check code below self.pool.apply_async(b.execute()) b = batchquery(batch_type=batchtype.unlogged) += 1 print "done processing: {}".format(self.path) print "time execute: {}".format(datetime.datetime.now() - start) print "batches: {}".format(i / 1000) print "records processed: {}".format(i - 1) and because might bit of help, here's connection.check_connection() method (and surrounding methods):
def setup_defaults(): connection.setup(['127.0.0.1'], 'processor_api', lazy_connect=true) def check_connection(): cdr.models import didsummary try: didsummary.objects.all().count() except cqlengineexception: setup_defaults()
batches, in general, aren't fastest method performing insertions. in unlogged batches containing various partitions. reading on batches here
if can pull away cqlengine insertion, should try async callback chaining implemented in python driver under: cassandra.execute_concurrent.
i've had major improvements in inserts/sec moving method after misusing batches of various sizes, ymmv.
Comments
Post a Comment