Thanks, this is what I ended up with, which seems to work:
// select incoming data and round timestamp to 1-min bucket
.each(new Fields("data"), new BucketAndResource(), new Fields("bucket",
"resourceId"))
// group data
.groupBy(new Fields("bucket", "resourceId"))
// remove duplicate resources in the same bucket
.aggregate(new Fields("resourceId"), new FirstN.FirstNAgg(1), new
Fields("resourceIdAgg"))
// MyAgg counts all instances of each bucket
.aggregate(new Fields("bucket"), new MyAgg(), new Fields("bucketAgg",
"count"))
/ Jonas
2014-03-23 10:31 GMT+01:00 Vinoth Kumar Kannan <[email protected]>:
> This could be one of the approaches:
>
>
> Your topology may follow this flow
>
>
>
> 1. function to rounding of the time stamp to the nearest whole minute
> for each tuple
> 2. groupBy 2 fields ie rounded timestamp and the resourceId
> 3. aggregating resourceId to remove duplicates
> 4. groupBy rounded timestamp
> 5. aggregating the result by rounded timestamp will give the result
> what you expect
>
>
>
>
> On Sun, Mar 23, 2014 at 9:27 AM, Jonas Bergström <[email protected]>wrote:
>
>> Do you mean using two different trident-states? Well, I want to group on
>> resource within the time bucket grouping, so I don't see how that would
>> work.
>> Using regular bolts I'm grouping on resource-id and each bolt keeps track
>> of time buckets themselves, and then they are all merged regularly by a
>> global bolt.
>>
>> But it would be nice to make use of trident states so that I don't have
>> to implement my own persistence of the bolt states.
>>
>>
>> / Jonas
>>
>>
>> 2014-03-21 18:08 GMT+01:00 Susheel Kumar Gadalay <[email protected]>:
>>
>> Implement 2 bolts subscribing to the same spout tuple.
>>> bolt1 is group by time stamp and bolt group by resource id.
>>>
>>> A spout can have multiple bolts following.
>>>
>>>
>>> On 3/21/14, Jonas Bergström <[email protected]> wrote:
>>> > Hi, I've just started using Storm and have a couple of questions.
>>> >
>>> > I have a stream of events that consists of a timestamp and a
>>> resource-id
>>> > and I want to "bucket" them into discrete time-buckets, e.g. 1 minute
>>> long,
>>> > and also group on resource-id so that even if the same resource-id is
>>> > encountered multiple times during the same time bucket it is only
>>> counted
>>> > as one.
>>> >
>>> > I'm mapping the timestamp onto a date-string with minute granularity
>>> and
>>> > groups on that, which woks fine. But I don't understand how to add the
>>> > grouping on resource-id as well.
>>> >
>>> > For example, I want the following stream [timestamp,id]:
>>> > "2014-03-20 14:18:32,887,1"
>>> > "2014-03-20 14:18:42,887,2"
>>> > "2014-03-20 14:18:52,887,1"
>>> > "2014-03-20 14:18:57,887,1"
>>> > "2014-03-20 14:18:58,887,3"
>>> > "2014-03-20 14:19:07,887,1"
>>> >
>>> > to result in [timebucket,count]:
>>> > "2014-03-20 14:18:00,3"
>>> > "2014-03-20 14:19:00,1"
>>> >
>>> > Any ideas?
>>> > I already implemented this using tick-tuples and grouping on
>>> resource-id,
>>> > but I want to use Trident instead and be able to catch up properly if I
>>> > restart the Storm cluster.
>>> >
>>> > Also, I read in several places that one can have a spout batch by
>>> > "punctuation", which fits my use case well. But I haven't understood
>>> how
>>> > this can be implemented. Does anybody have any pointers?
>>> >
>>> >
>>> > Many thanks / Jonas
>>> >
>>>
>>
>>
>
>
> --
> With Regards,
> Vinoth Kumar K
>