Thanks Andrew & Kishore 

What you assumed were right, here is the details, I am in the process to 
building a real-time data process pipeline, I like to use kafka+storm. 
Currently, we don’t have a API to stream the data in, I use python-kafka to 
write a producer to pull out 1000000 rows from postgresDB and 
send_messages(self.topic, tup[0]) to kafka, I tried to make things simple, all 
1000000 records go to same topic 

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-topic
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/usr/local/kafka/kafka-0.8.2.1-src/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/local/kafka/kafka-0.8.2.1-src/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Topic:test-topic        PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test-topic       Partition: 0    Leader: 1       Replicas: 1     
Isr: 1

I didn’t make partitions, no replications, (should I do thing this way?) When 
data was pushed into kafka, it is clearly been shuffled, so when I print the 
timestamp in storm bolt, it is like this
2013-03-21 12:01:00-07:00
12445 [ProcessThread(sid:0 cport:-1):] INFO  
org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level 
KeeperException when processing sessionid:0x14f6af9bafd000f type:create 
cxid:0x3 zxid:0x27 txntype:-1 reqpath:n/a Error 
Path:/test-topic/e803d8e2-500c-413a-8b3a-0273e94840aa Error:KeeperErrorCode = 
NoNode for /test-topic/e803d8e2-500c-413a-8b3a-0273e94840aa
2013-03-21 12:59:00-07:00
2013-03-21 12:58:00-07:00
2013-03-21 12:21:00-07:00
2013-03-21 13:00:00-07:00
2013-03-21 11:47:00-07:00
2013-03-21 13:01:00-07:00
2013-03-21 11:52:00-07:00
2013-03-21 12:02:00-07:00
2013-03-21 13:02:00-07:00
2013-03-21 12:22:00-07:00
2013-03-21 12:23:00-07:00
2013-03-21 13:03:00-07:00
2013-03-21 13:05:00-07:00
2013-03-21 13:04:00-07:00
2013-03-21 12:03:00-07:00
2013-03-21 12:24:00-07:00
2013-03-21 13:07:00-07:00
2013-03-21 13:06:00-07:00
2013-03-21 12:25:00-07:00
2013-03-21 13:08:00-07:00
2013-03-21 13:09:00-07:00
2013-03-21 11:53:00-07:00
2013-03-21 12:26:00-07:00
2013-03-21 12:04:00-07:00

As Kishore said, I need to do group by first, say fieldsGrouping(“measurement”, 
“sensor_id”, 10), so all tuples with same sensor_id will go to same tasks, so I 
assume I wouldn’t worry about groupby here, but as we discussed, I want to sort 
the measurement data from same device based on timestamps associated, 
therefore, I will be able to know if some missed minutes data exist or not, if 
there are missing minutes, I will fill in them by compute the average of last 2 
events. It seems the timestamps are completely out of order, I never use 
trident partition aggregator, I am not sure if it can take on sorting job, but 
my concern if this type of sorting requires how much metadata in State, also 
batch issue makes me confused too, :(


Thanks

AL


> On Aug 25, 2015, at 9:31 PM, Kishore Senji <[email protected]> wrote:
> 
> Agreed. This makes sense if the aggregation is on fields etc. 
> 
> Although Alec did not mention it in this post, based on his previous posts on 
> the same topic, I would assume he is trying to  sort the events because he 
> wanted to "fill in" the missing events (smoothening the curve so to speak) by 
> looking at the previous and next events of the missed timestamp and then do 
> some stream processing on top of it (like for example alerting based on 
> sliding window). Assuming that is the scenario, I guess then he would have to 
> keep more metadata in the State so that he can fill in those events but the 
> question would be when would he stop looking for missing events and fill them 
> and move on (as they can come in different batches), plus he would have to do 
> some stream processing (or store them to ES for later search for example) in 
> the State itself if there is any such processing. This is where I think it 
> gets tricky to do this in the partition aggregator.
> 
> So in our earlier posts we suggested he can do the the appropriate 
> partitioning in Kafka (so that events from a given device ends up in the same 
> partition) and he could do the window based sorting (by buffering few events) 
> in the Stream processing.
> 
> Alec, Please ignore the above if my assumption is not correct.
> 
> 
> On Tue, Aug 25, 2015 at 6:19 PM, Andrew Xor <[email protected] 
> <mailto:[email protected]>> wrote:
> This is not an issue, as that probably would be done through a partition 
> aggregator after the groupBy.
> 
> Kindly yours,
> 
> Andrew Grammenos
> 
> -- PGP PKey --
> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt 
> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt>
> 
> On Wed, Aug 26, 2015 at 4:16 AM, Kishore Senji <[email protected] 
> <mailto:[email protected]>> wrote:
> Interesting. But wouldn't this be impacted by the trident batch size?
> 
> Assuming the batch boundary is like below, after bucketing you would groupBy 
> on the start time (but how would you sort it?) and assumed it can be sorted, 
> we should be done with that batch. so if the batch boundary is like below, 
> you would end up with two different sets of sorts for events which are 
> supposed to be together (12:44, 12:45 & 12:46 below). If I understand the 
> original question, it is how to sort the full stream of events irrespective 
> of how they are processed in batches.
> 
> 2013-03-22 12:43:00-07:00
> 2013-03-22 12:44:00-07:00
> 2013-03-22 12:45:00-07:00
> 2013-03-22 12:49:00-07:00
> 2013-03-22 12:47:00-07:00
> --------------------------------------
> 2013-03-22 12:48:00-07:00
> 2013-03-22 12:46:00-07:00
> 2013-03-22 12:51:00-07:00
> 2013-03-22 12:50:00-07:00
> 2013-03-22 12:52:00-07:00
>  
> 
> 
> 
> On Tue, Aug 25, 2015 at 4:58 PM, Andrew Xor <[email protected] 
> <mailto:[email protected]>> wrote:
> Yes, unless I am missing something... try it and if you have any more 
> problems drop an email.
> 
> Regards.
> 
> Kindly yours,
> 
> Andrew Grammenos
> 
> -- PGP PKey --
> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt 
> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt>
> 
> On Wed, Aug 26, 2015 at 2:46 AM, Alec Lee <[email protected] 
> <mailto:[email protected]>> wrote:
> WoW, that code seems to be exactly I want, will read through, double check, I 
> will still need a partition aggregator to actually sorting after 
> bucketization, right?
> 
> thanks
> 
> 
>> On Aug 25, 2015, at 4:40 PM, Andrew Xor <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> Sure, I found this code useful to start with; he does bucketization for 
>> timed intervals in this gist 
>> <https://gist.github.com/codyaray/75533044fc8c0a12fa67>.
>> 
>> Hope this helps.
>> 
>> Kindly yours,
>> 
>> Andrew Grammenos
>> 
>> -- PGP PKey --
>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt 
>> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt>
>> 
>> On Wed, Aug 26, 2015 at 2:36 AM, Alec Lee <[email protected] 
>> <mailto:[email protected]>> wrote:
>> All right, will do trident instead, shameless to ask again, any example code 
>> (particularly for events time sorting) to study? 
>> 
>> thanks
>> 
>> 
>>> On Aug 25, 2015, at 4:31 PM, Andrew Xor <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> Well, if you need to just preserve the order of received (event) tuples 
>>> then why not use trident instead? Trident ensures correct ordering 
>>> (chronologically) as well as exactly once processing without any gimmicks; 
>>> sorting it secondary to the event generation sounds like you will enter 
>>> into quite a bit of hassle for no reason.
>>> 
>>> Regards.
>>> 
>>> Kindly yours,
>>> 
>>> Andrew Grammenos
>>> 
>>> -- PGP PKey --
>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt 
>>> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt>
>>> 
>>> On Wed, Aug 26, 2015 at 2:00 AM, Alec Lee <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> BTW, I am using spout and bolts, currently not using trident. Thanks
>>> 
>>> 
>>>> On Aug 25, 2015, at 3:47 PM, Andrew Xor <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>> 
>>>> What do you mean by that? It's a bit vague as timestamps can have quite 
>>>> high resolution (like for example minutes, seconds, msec) so you will 
>>>> probably have to do a bit of bucketization before sorting them.... then by 
>>>> using a partition aggregator (in Trident at least) you can to this very 
>>>> easily.
>>>> ​​
>>>> Hope this helps.
>>>> 
>>>> Kindly yours,
>>>> 
>>>> Andrew Grammenos
>>>> 
>>>> -- PGP PKey --
>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt 
>>>> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt>
>>>> 
>>>> On Wed, Aug 26, 2015 at 1:37 AM, Alec Lee <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>> Hi, all
>>>> 
>>>> is there any sample codes to sort the events in terms of the timestamps 
>>>> field of a tuple?
>>>> 
>>>> thanks
>>>> 
>>>> 
>>>> AL
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 
> 
> 
> 

Reply via email to