Hi, Andrew
I have made a trident topology works today, the topology is like this
topology.newStream("spoutInit", kafkaSpout)
.each(new Fields("str"),
new JsonObjectParse(),
new Fields("sensor_id",
"period",
"powermon_num",
"current",
"id",
"measurement_timestamp",
"measurement_dateuploaded")
)
.parallelismHint(6)
.groupBy(new Fields("sensor_id"))
.each(new
Fields("sensor_id","measurement_timestamp"),
new PrintStream(),
new
Fields("sensorid","timestamps")) ;
I am able to print the device_id and timestamps
[febc0061, 2013-03-14 19:23:00-07:00]
[febc0061, 2013-03-14 19:22:00-07:00]
[febc0061, 2013-03-14 19:21:00-07:00]
[febc0061, 2013-03-14 18:42:00-07:00]
[febc0061, 2013-03-14 18:59:00-07:00]
[febc0061, 2013-03-14 19:20:00-07:00]
[febc0061, 2013-03-14 18:39:00-07:00]
[febc0061, 2013-03-14 18:41:00-07:00]
[febc0061, 2013-03-14 18:40:00-07:00]
[febc0061, 2013-03-14 18:58:00-07:00]
[febc0061, 2013-03-14 18:49:00-07:00]
[febc0061, 2013-03-14 18:44:00-07:00]
[febc0061, 2013-03-14 18:43:00-07:00]
[febc0061, 2013-03-14 18:48:00-07:00]
[febc0061, 2013-03-14 18:46:00-07:00]
[febc0061, 2013-03-14 18:45:00-07:00]
[febc0061, 2013-03-14 18:47:00-07:00]
[febc0061, 2013-03-14 18:52:00-07:00]
[febc0061, 2013-03-14 18:50:00-07:00]
[febc0061, 2013-03-14 18:57:00-07:00]
[febc0061, 2013-03-14 18:51:00-07:00]
[febc0061, 2013-03-14 18:56:00-07:00]
[febc0061, 2013-03-14 18:53:00-07:00]
[febc0061, 2013-03-14 18:55:00-07:00]
[febc0061, 2013-03-14 18:54:00-07:00]
[febc0061, 2013-03-14 19:19:00-07:00]
[febc0061, 2013-03-14 19:03:00-07:00]
[febc0061, 2013-03-14 19:12:00-07:00]
I am poking around, was’t able to get any hint to use partition aggregator to
do sorting in batch, i thought this sorting will be done locally in local
partition after group by, so each partition will take multiple group which has
sorted records, how to do that in batch? In addition, I need to do a reduce
process in the end to combine all the partitions?
thanks
> On Aug 26, 2015, at 11:11 AM, Andrew Xor <[email protected]> wrote:
>
> I don't think it's easier or harder to learn... but both have pros and cons;
> in your case the semantics that you are trying to apply in your particular
> scenario sound more like a use-case for a Trident based topology that's all.
>
> Regards.
>
> Kindly yours,
>
> Andrew Grammenos
>
> -- PGP PKey --
> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt
> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt>
>
> On Wed, Aug 26, 2015 at 8:35 PM, Alec Lee <[email protected]
> <mailto:[email protected]>> wrote:
> For dealing with such type of problem, seems trident is better than
> spout+bolts even latter is easier to understand and learn?
>
> AL
>> On Aug 25, 2015, at 9:31 PM, Kishore Senji <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Agreed. This makes sense if the aggregation is on fields etc.
>>
>> Although Alec did not mention it in this post, based on his previous posts
>> on the same topic, I would assume he is trying to sort the events because
>> he wanted to "fill in" the missing events (smoothening the curve so to
>> speak) by looking at the previous and next events of the missed timestamp
>> and then do some stream processing on top of it (like for example alerting
>> based on sliding window). Assuming that is the scenario, I guess then he
>> would have to keep more metadata in the State so that he can fill in those
>> events but the question would be when would he stop looking for missing
>> events and fill them and move on (as they can come in different batches),
>> plus he would have to do some stream processing (or store them to ES for
>> later search for example) in the State itself if there is any such
>> processing. This is where I think it gets tricky to do this in the partition
>> aggregator.
>>
>> So in our earlier posts we suggested he can do the the appropriate
>> partitioning in Kafka (so that events from a given device ends up in the
>> same partition) and he could do the window based sorting (by buffering few
>> events) in the Stream processing.
>>
>> Alec, Please ignore the above if my assumption is not correct.
>>
>>
>> On Tue, Aug 25, 2015 at 6:19 PM, Andrew Xor <[email protected]
>> <mailto:[email protected]>> wrote:
>> This is not an issue, as that probably would be done through a partition
>> aggregator after the groupBy.
>>
>> Kindly yours,
>>
>> Andrew Grammenos
>>
>> -- PGP PKey --
>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt
>> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt>
>>
>> On Wed, Aug 26, 2015 at 4:16 AM, Kishore Senji <[email protected]
>> <mailto:[email protected]>> wrote:
>> Interesting. But wouldn't this be impacted by the trident batch size?
>>
>> Assuming the batch boundary is like below, after bucketing you would groupBy
>> on the start time (but how would you sort it?) and assumed it can be sorted,
>> we should be done with that batch. so if the batch boundary is like below,
>> you would end up with two different sets of sorts for events which are
>> supposed to be together (12:44, 12:45 & 12:46 below). If I understand the
>> original question, it is how to sort the full stream of events irrespective
>> of how they are processed in batches.
>>
>> 2013-03-22 12:43:00-07:00
>> 2013-03-22 12:44:00-07:00
>> 2013-03-22 12:45:00-07:00
>> 2013-03-22 12:49:00-07:00
>> 2013-03-22 12:47:00-07:00
>> --------------------------------------
>> 2013-03-22 12:48:00-07:00
>> 2013-03-22 12:46:00-07:00
>> 2013-03-22 12:51:00-07:00
>> 2013-03-22 12:50:00-07:00
>> 2013-03-22 12:52:00-07:00
>>
>>
>>
>>
>> On Tue, Aug 25, 2015 at 4:58 PM, Andrew Xor <[email protected]
>> <mailto:[email protected]>> wrote:
>> Yes, unless I am missing something... try it and if you have any more
>> problems drop an email.
>>
>> Regards.
>>
>> Kindly yours,
>>
>> Andrew Grammenos
>>
>> -- PGP PKey --
>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt
>> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt>
>>
>> On Wed, Aug 26, 2015 at 2:46 AM, Alec Lee <[email protected]
>> <mailto:[email protected]>> wrote:
>> WoW, that code seems to be exactly I want, will read through, double check,
>> I will still need a partition aggregator to actually sorting after
>> bucketization, right?
>>
>> thanks
>>
>>
>>> On Aug 25, 2015, at 4:40 PM, Andrew Xor <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Sure, I found this code useful to start with; he does bucketization for
>>> timed intervals in this gist
>>> <https://gist.github.com/codyaray/75533044fc8c0a12fa67>.
>>>
>>> Hope this helps.
>>>
>>> Kindly yours,
>>>
>>> Andrew Grammenos
>>>
>>> -- PGP PKey --
>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt
>>> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt>
>>>
>>> On Wed, Aug 26, 2015 at 2:36 AM, Alec Lee <[email protected]
>>> <mailto:[email protected]>> wrote:
>>> All right, will do trident instead, shameless to ask again, any example
>>> code (particularly for events time sorting) to study?
>>>
>>> thanks
>>>
>>>
>>>> On Aug 25, 2015, at 4:31 PM, Andrew Xor <[email protected]
>>>> <mailto:[email protected]>> wrote:
>>>>
>>>> Well, if you need to just preserve the order of received (event) tuples
>>>> then why not use trident instead? Trident ensures correct ordering
>>>> (chronologically) as well as exactly once processing without any gimmicks;
>>>> sorting it secondary to the event generation sounds like you will enter
>>>> into quite a bit of hassle for no reason.
>>>>
>>>> Regards.
>>>>
>>>> Kindly yours,
>>>>
>>>> Andrew Grammenos
>>>>
>>>> -- PGP PKey --
>>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt
>>>> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt>
>>>>
>>>> On Wed, Aug 26, 2015 at 2:00 AM, Alec Lee <[email protected]
>>>> <mailto:[email protected]>> wrote:
>>>> BTW, I am using spout and bolts, currently not using trident. Thanks
>>>>
>>>>
>>>>> On Aug 25, 2015, at 3:47 PM, Andrew Xor <[email protected]
>>>>> <mailto:[email protected]>> wrote:
>>>>>
>>>>> What do you mean by that? It's a bit vague as timestamps can have quite
>>>>> high resolution (like for example minutes, seconds, msec) so you will
>>>>> probably have to do a bit of bucketization before sorting them.... then
>>>>> by using a partition aggregator (in Trident at least) you can to this
>>>>> very easily.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> Kindly yours,
>>>>>
>>>>> Andrew Grammenos
>>>>>
>>>>> -- PGP PKey --
>>>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>> https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt
>>>>> <https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt>
>>>>>
>>>>> On Wed, Aug 26, 2015 at 1:37 AM, Alec Lee <[email protected]
>>>>> <mailto:[email protected]>> wrote:
>>>>> Hi, all
>>>>>
>>>>> is there any sample codes to sort the events in terms of the timestamps
>>>>> field of a tuple?
>>>>>
>>>>> thanks
>>>>>
>>>>>
>>>>> AL
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>>
>>
>>
>
>