i'm trying count number of user's original tweets after i've stored of tweets i've downloaded mongodb database using storm. anyways whenever count number of authors original tweets using following code,it keeps reading (and counting) same tweet.
bolt:
public class calculatethemetrics extends basebasicbolt { map<string,double>ot1=new hashmap<string, double>(); @override public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("username","ot1")); } @override public void execute(tuple input,basicoutputcollector collector) { string author=input.getstring(0); string tweet=input.getstring(2); double ot1=this.ot1.get(author); if(ot1==null){ ot1=0.0; } if(author!=null && tweet!=null ){ if(!tweet.startswith("@") || !tweet.startswith("rt")){ ot1+=1; } this.ot1.put(author,ot1); system.out.println(author+" +ot1); collector.emit(new values(author,ot1)) } } topology:
public class theauthorsandtheirtweetdata { public static void main(string[]args) throws exception{ topologybuilder topologybuilder=new topologybuilder(); topologybuilder.setspout("read_tweet_data_from_mongodb", new readlinesfromtextfile("tweets.txt")); topologybuilder.setbolt("tweet_data_from_mongodb_to_further_processing",new frommongodbtoprocessing()).shufflegrouping("read_tweet_data_from_mongodb"); topologybuilder.setspout("read_the_authors_from_text_file",new readlastlinefromtextfile("authors.txt")); topologybuilder.setbolt("from_the_authors_text_file_to_further_processing", new fromtheauthorstextfiletofurtherprocessing()).shufflegrouping("read_the_authors_from_text_file"); topologybuilder.setbolt("search_for_the_authors_tweet_data",new searchfortheauthorstweetdata(),16).fieldsgrouping("tweet_data_from_mongodb_to_further_processing",new fields("username","id")).fieldsgrouping("from_the_authors_text_file_to_further_processing",new fields("username","id")); topologybuilder.setbolt("calculate_the_metrics",new calculatethemetrics(),64).fieldsgrouping("search_for_the_authors_tweet_data",new fields("username")); config config=new config(); if(args!=null && args.length>0){ config.setnumworkers(10); config.setnumackers(5); config.setmaxspoutpending(100); stormsubmitter.submittopology(args[0], config, topologybuilder.createtopology()); }else{ localcluster localcluster=new localcluster(); localcluster.submittopology("test",config,topologybuilder.createtopology()); utils.sleep(1*60*60*1000); localcluster.killtopology("test"); localcluster.shutdown(); } } }
what want is,for stop reading repeatedly same tweet , counting same tweet.please help
something this?
public class calculate1metric extends baserichbolt { private outputcollector collector; map<string ,integer>ot1; @override public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("username","ot1")); } @override public void prepare(map stormconf, topologycontext context, outputcollector collector) { this.collector=collector; this.ot1=new hashmap<string, integer>(); } @override public void execute(tuple input) { final string sourcecomponent = input.getsourcecomponent(); string author = input.getstring(0); string tweet = input.getstring(2); if (author != null && tweet != null) { integer ot1 = this.ot1.get(author); if (ot1 == null) { ot1 = 0; } if (!tweet.startswith("@") || !tweet.contains("rt ") || !tweet.startswith("rt")) { ot1 += 1; } if(!this.ot1.containskey(author)) { this.ot1.put(author, ot1); }else{ collector.emit(new values(author,ot1,ot2)); system.out.println(author + " " + ot1+" "+ot2); this.ot1.remove(author); } }else{ collector.fail(input); } collector.ack(input); }
Comments
Post a Comment