Thanks Andrew & Kishore What you assumed were right, here is the details, I am in the process to building a real-time data process pipeline, I like to use kafka+storm. Currently, we don’t have a API to stream the data in, I use python-kafka to write a producer to pull out 1000000 rows from postgresDB and send_messages(self.topic, tup[0]) to kafka, I tried to make things simple, all 1000000 records go to same topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/local/kafka/kafka-0.8.2.1-src/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/local/kafka/kafka-0.8.2.1-src/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Topic:test-topic PartitionCount:1 ReplicationFactor:1 Configs: Topic: test-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1 I didn’t make partitions, no replications, (should I do thing this way?) When data was pushed into kafka, it is clearly been shuffled, so when I print the timestamp in storm bolt, it is like this 2013-03-21 12:01:00-07:00 12445 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x14f6af9bafd000f type:create cxid:0x3 zxid:0x27 txntype:-1 reqpath:n/a Error Path:/test-topic/e803d8e2-500c-413a-8b3a-0273e94840aa Error:KeeperErrorCode = NoNode for /test-topic/e803d8e2-500c-413a-8b3a-0273e94840aa 2013-03-21 12:59:00-07:00 2013-03-21 12:58:00-07:00 2013-03-21 12:21:00-07:00 2013-03-21 13:00:00-07:00 2013-03-21 11:47:00-07:00 2013-03-21 13:01:00-07:00 2013-03-21 11:52:00-07:00 2013-03-21 12:02:00-07:00 2013-03-21 13:02:00-07:00 2013-03-21 12:22:00-07:00 2013-03-21 12:23:00-07:00 2013-03-21 13:03:00-07:00 2013-03-21 13:05:00-07:00 2013-03-21 13:04:00-07:00 2013-03-21 12:03:00-07:00 2013-03-21 12:24:00-07:00 2013-03-21 13:07:00-07:00 2013-03-21 13:06:00-07:00 2013-03-21 12:25:00-07:00 2013-03-21 13:08:00-07:00 2013-03-21 13:09:00-07:00 2013-03-21 11:53:00-07:00 2013-03-21 12:26:00-07:00 2013-03-21 12:04:00-07:00 As Kishore said, I need to do group by first, say fieldsGrouping(“measurement”, “sensor_id”, 10), so all tuples with same sensor_id will go to same tasks, so I assume I wouldn’t worry about groupby here, but as we discussed, I want to sort the measurement data from same device based on timestamps associated, therefore, I will be able to know if some missed minutes data exist or not, if there are missing minutes, I will fill in them by compute the average of last 2 events. It seems the timestamps are completely out of order, I never use trident partition aggregator, I am not sure if it can take on sorting job, but my concern if this type of sorting requires how much metadata in State, also batch issue makes me confused too, :( Thanks AL > On Aug 25, 2015, at 9:31 PM, Kishore Senji <[email protected]> wrote: > > Agreed. This makes sense if the aggregation is on fields etc. > > Although Alec did not mention it in this post, based on his previous posts on > the same topic, I would assume he is trying to sort the events because he > wanted to "fill in" the missing events (smoothening the curve so to speak) by > looking at the previous and next events of the missed timestamp and then do > some stream processing on top of it (like for example alerting based on > sliding window). Assuming that is the scenario, I guess then he would have to > keep more metadata in the State so that he can fill in those events but the > question would be when would he stop looking for missing events and fill them > and move on (as they can come in different batches), plus he would have to do > some stream processing (or store them to ES for later search for example) in > the State itself if there is any such processing. This is where I think it > gets tricky to do this in the partition aggregator. > > So in our earlier posts we suggested he can do the the appropriate > partitioning in Kafka (so that events from a given device ends up in the same > partition) and he could do the window based sorting (by buffering few events) > in the Stream processing. > > Alec, Please ignore the above if my assumption is not correct. > > > On Tue, Aug 25, 2015 at 6:19 PM, Andrew Xor <[email protected] > <mailto:[email protected]>> wrote: > This is not an issue, as that probably would be done through a partition > aggregator after the groupBy. > > Kindly yours, > > Andrew Grammenos > > -- PGP PKey -- > <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> > https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt > <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt> > > On Wed, Aug 26, 2015 at 4:16 AM, Kishore Senji <[email protected] > <mailto:[email protected]>> wrote: > Interesting. But wouldn't this be impacted by the trident batch size? > > Assuming the batch boundary is like below, after bucketing you would groupBy > on the start time (but how would you sort it?) and assumed it can be sorted, > we should be done with that batch. so if the batch boundary is like below, > you would end up with two different sets of sorts for events which are > supposed to be together (12:44, 12:45 & 12:46 below). If I understand the > original question, it is how to sort the full stream of events irrespective > of how they are processed in batches. > > 2013-03-22 12:43:00-07:00 > 2013-03-22 12:44:00-07:00 > 2013-03-22 12:45:00-07:00 > 2013-03-22 12:49:00-07:00 > 2013-03-22 12:47:00-07:00 > -------------------------------------- > 2013-03-22 12:48:00-07:00 > 2013-03-22 12:46:00-07:00 > 2013-03-22 12:51:00-07:00 > 2013-03-22 12:50:00-07:00 > 2013-03-22 12:52:00-07:00 > > > > > On Tue, Aug 25, 2015 at 4:58 PM, Andrew Xor <[email protected] > <mailto:[email protected]>> wrote: > Yes, unless I am missing something... try it and if you have any more > problems drop an email. > > Regards. > > Kindly yours, > > Andrew Grammenos > > -- PGP PKey -- > <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> > https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt > <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt> > > On Wed, Aug 26, 2015 at 2:46 AM, Alec Lee <[email protected] > <mailto:[email protected]>> wrote: > WoW, that code seems to be exactly I want, will read through, double check, I > will still need a partition aggregator to actually sorting after > bucketization, right? > > thanks > > >> On Aug 25, 2015, at 4:40 PM, Andrew Xor <[email protected] >> <mailto:[email protected]>> wrote: >> >> Sure, I found this code useful to start with; he does bucketization for >> timed intervals in this gist >> <https://gist.github.com/codyaray/75533044fc8c0a12fa67>. >> >> Hope this helps. >> >> Kindly yours, >> >> Andrew Grammenos >> >> -- PGP PKey -- >> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt >> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt> >> >> On Wed, Aug 26, 2015 at 2:36 AM, Alec Lee <[email protected] >> <mailto:[email protected]>> wrote: >> All right, will do trident instead, shameless to ask again, any example code >> (particularly for events time sorting) to study? >> >> thanks >> >> >>> On Aug 25, 2015, at 4:31 PM, Andrew Xor <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Well, if you need to just preserve the order of received (event) tuples >>> then why not use trident instead? Trident ensures correct ordering >>> (chronologically) as well as exactly once processing without any gimmicks; >>> sorting it secondary to the event generation sounds like you will enter >>> into quite a bit of hassle for no reason. >>> >>> Regards. >>> >>> Kindly yours, >>> >>> Andrew Grammenos >>> >>> -- PGP PKey -- >>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >>> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt >>> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt> >>> >>> On Wed, Aug 26, 2015 at 2:00 AM, Alec Lee <[email protected] >>> <mailto:[email protected]>> wrote: >>> BTW, I am using spout and bolts, currently not using trident. Thanks >>> >>> >>>> On Aug 25, 2015, at 3:47 PM, Andrew Xor <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> >>>> What do you mean by that? It's a bit vague as timestamps can have quite >>>> high resolution (like for example minutes, seconds, msec) so you will >>>> probably have to do a bit of bucketization before sorting them.... then by >>>> using a partition aggregator (in Trident at least) you can to this very >>>> easily. >>>> >>>> Hope this helps. >>>> >>>> Kindly yours, >>>> >>>> Andrew Grammenos >>>> >>>> -- PGP PKey -- >>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >>>> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt >>>> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt> >>>> >>>> On Wed, Aug 26, 2015 at 1:37 AM, Alec Lee <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> Hi, all >>>> >>>> is there any sample codes to sort the events in terms of the timestamps >>>> field of a tuple? >>>> >>>> thanks >>>> >>>> >>>> AL >>>> >>> >>> >> >> > > > > >
