python - Fastest and most efficient way to insert and update a lot of rows in cassandra using cqlengine -


i have csv file 15000000 records i'm trying process cassandra table. here's example of column headers , data:

enter image description here

to make more sense of it, here model in python:

class didsummary(model):     __keyspace__ = 'processor_api'      did = columns.text(required=true, primary_key=true, partition_key=true)     month = columns.datetime(required=true, primary_key=true, partition_key=true)     direction = columns.text(required=true, primary_key=true)     duration = columns.counter(required=true)     cost = columns.counter(required=true) 

right i'm trying process data in each row of csv file , insert them in batches of 500, 1000, 10000, 250, etc. same time results (about .33 seconds per 1000, means take 90 minutes through of them). tried taking multiprocessing pool , apply_async()'ing each batch.execute() call, no better results. there way could, in python, use sstablewriter, or else insert them cassandra better? reference, here's process_sheet_row() method:

def process_sheet_row(self, row, batch):     report_datetime = '{0}{1:02d}'.format(self.report.report_year, self.report.report_month)     duration = int(float(row[self.columns['duration']]) * 10)     cost = int(float(row[self.columns['cost']]) * 100000)      anisummary = didsummary.batch(batch).create(did='{}{}'.format(self.report.ani_country_code, row[self.columns['ani']]),                                                 direction='from',                                                 month=datetime.datetime.strptime(report_datetime, '%y%m'))     anisummary.duration += duration     anisummary.cost += cost     anisummary.batch(batch).save()      destsummary = didsummary.batch(batch).create(did='{}{}'.format(self.report.dest_country_code, row[self.columns['dest']]),                                                  direction='to',                                                  month=datetime.datetime.strptime(report_datetime, '%y%m'))     destsummary.duration += duration     destsummary.cost += cost     destsummary.batch(batch).save() 

any appreciated. thanks!

edit: here code going through file , processing it:

with open(self.path) csvfile:     reader = csv.dictreader(csvfile)     if arr[0] == 'inventory':             self.parse_inventory(reader)     b = batchquery(batch_type=batchtype.unlogged)     = 1     row in reader:         self.parse_sheet_row(row, b)         if not % 1000:             connection.check_connection() # makes sure we're still connected cassandra. check code below             self.pool.apply_async(b.execute())             b = batchquery(batch_type=batchtype.unlogged)         += 1 print "done processing: {}".format(self.path) print "time execute: {}".format(datetime.datetime.now() - start) print "batches: {}".format(i / 1000) print "records processed: {}".format(i - 1) 

and because might bit of help, here's connection.check_connection() method (and surrounding methods):

def setup_defaults():     connection.setup(['127.0.0.1'], 'processor_api', lazy_connect=true)  def check_connection():     cdr.models import didsummary     try:         didsummary.objects.all().count()     except cqlengineexception:         setup_defaults() 

batches, in general, aren't fastest method performing insertions. in unlogged batches containing various partitions. reading on batches here

if can pull away cqlengine insertion, should try async callback chaining implemented in python driver under: cassandra.execute_concurrent.

i've had major improvements in inserts/sec moving method after misusing batches of various sizes, ymmv.


Comments