Re: [Dev] [wso2/siddhi] Has problems with Siddhi group by with a time window (#154)

2016-06-03 Thread Tishan Dahanayakage
Ralph,

Please find inline.

On Fri, Jun 3, 2016 at 4:06 PM, Liangfei.Su  wrote:

> Sorry, i don't catch here. Let's say the second time of query call back,
> should be two group results (why the first tuple not expect to be in the
> groups)?
>
​In ExternalTimeWindow when a new event arrives, a copy will be stored and
original will be sent out as a current event alongside with any other
stored events which are expired. ​In this case no events will be expired
within the period input is received. So every-time only one event will
arrive at callback(There can be a situation where events can be grouped
because they are published in quick succession. But that is not
guaranteed). Output of the ExternalTimeBatch window does not represent the
events that resides inside the window at that time. Hope this clear your
doubt.

>
>
>
> Understand the external time batch as i was working on it. :)
>

​Oh OK. So you are @RalphSu. Just connected the dots :) Thanks again for
your contribution.

/Tishan​


>
>
> Ralph
>
>
> On Fri, Jun 3, 2016 at 5:57 PM, Tishan Dahanayakage 
> wrote:
>
>> Yeah. So if that block is inside your siddhiBolt.queryCallback, Then
>> each time you receive an event you will create a new Map(tempMap). Hence
>> your count will always be 1. For your requirement you have to use external
>> time batch window as Charini mentioned earlier.
>>
>> /Tishan
>>
>> On Fri, Jun 3, 2016 at 3:21 PM, Liangfei.Su  wrote:
>>
>>> Garrett might provide more details.
>>>
>>> There is format issue in github. Follow the code, basically, the output
>>> try to read the event data into Map of instanceUUid -> List>> different similarity> , then print the count of each instanceUuid.
>>>
>>> See
>>>
>>> String message = String.format("Count: %s; message=%s, timestamp=%s, 
>>> similarityId=%s,
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jun 3, 2016 at 2:59 PM, Tishan Dahanayakage 
>>> wrote:
>>>
 Ralph,

 Does following code recide inside your siddhiBolt.queryCallback?

 Map> tempMap = new HashMap>();
 for (Event event : inEvents) {
 Object[] data = event.getData();
 BootCorrelationAggregationData aggregateData = new
 BootCorrelationAggregationData();
 aggregateData.setSimilarityId((String) data[0]);
 aggregateData.setMessage((String) data[1]);
 aggregateData.setUuid((String) data[2]);
 aggregateData.setTimestamp((Long) data[3]);
 if (!tempMap.containsKey(aggregateData.getUuid())) {
 tempMap.put(aggregateData.getUuid(), new ArrayList());
 }
 tempMap.get(aggregateData.getUuid()).add(aggregateData);
 }

 Thanks
 /Tishan

 On Fri, Jun 3, 2016 at 11:47 AM, Liangfei.Su 
 wrote:

> +Garrett
>
> Understand the output based on each input data. The question here is,
> the count is always 1, i'm expecting count=1/2/3. e.g. when second input
> message send in, the first input message should still kept in the windows,
> so expect there should be two groups...
>
>
>
>
>
> On Fri, Jun 3, 2016 at 11:14 AM, Charini Nanayakkara <
> chari...@wso2.com> wrote:
>
>> Hi Liangfei.Su,
>>
>> This is the expected behavior of external time window. Since it's a
>> moving time window, you get an output for each incoming event. Due to the
>> presence of the group by keyword, it would give an aggregate output by
>> considering the events with the same uuid and similarityId, which arrived
>> within last two minutes.
>>
>> If you need only one output per group use the external time batch
>> window instead. Ensure that all the events come within the time duration
>> which you specify here.
>>
>> Regards,
>> Charini
>>
>> On Fri, Jun 3, 2016 at 6:16 AM, Liangfei.Su 
>> wrote:
>>
>>> +WSO2 @DL
>>>
>>> Could someone eligible to explain?
>>>
>>> https://github.com/wso2/siddhi/issues/154
>>>
>>>
>>> Thanks,
>>> Ralph
>>>
>>>
>>> On Fri, Jun 3, 2016 at 8:21 AM, Garrett 
>>> wrote:
>>>
 Has problems with Siddhi group by with a time window, I am able to
 get the correct result aggregations for the group, but I receive one
 aggregated result per event, not one per group.

 Here is the query:
 define stream bootCorrelationStream (logLevel string, message
 string, similarityId string, timestamp long, uuid string); @info(name =
 'bootCorrelation') from
 bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp,
  2
 min) select similarityId, adanos:first(message) as message, 
 min(timestamp)
 as startTime, max(timestamp) as endTime group by uuid, similarityId 
 insert
 into tempStream;

 Here is the output:
 

Re: [Dev] [wso2/siddhi] Has problems with Siddhi group by with a time window (#154)

2016-06-03 Thread Liangfei.Su
Sorry, i don't catch here. Let's say the second time of query call back,
should be two group results (why the first tuple not expect to be in the
groups)?



Understand the external time batch as i was working on it. :)


Ralph


On Fri, Jun 3, 2016 at 5:57 PM, Tishan Dahanayakage  wrote:

> Yeah. So if that block is inside your siddhiBolt.queryCallback, Then each
> time you receive an event you will create a new Map(tempMap). Hence your
> count will always be 1. For your requirement you have to use external time
> batch window as Charini mentioned earlier.
>
> /Tishan
>
> On Fri, Jun 3, 2016 at 3:21 PM, Liangfei.Su  wrote:
>
>> Garrett might provide more details.
>>
>> There is format issue in github. Follow the code, basically, the output
>> try to read the event data into Map of instanceUUid -> List> different similarity> , then print the count of each instanceUuid.
>>
>> See
>>
>> String message = String.format("Count: %s; message=%s, timestamp=%s, 
>> similarityId=%s,
>>
>>
>>
>>
>>
>> On Fri, Jun 3, 2016 at 2:59 PM, Tishan Dahanayakage 
>> wrote:
>>
>>> Ralph,
>>>
>>> Does following code recide inside your siddhiBolt.queryCallback?
>>>
>>> Map> tempMap = new HashMap>();
>>> for (Event event : inEvents) {
>>> Object[] data = event.getData();
>>> BootCorrelationAggregationData aggregateData = new
>>> BootCorrelationAggregationData();
>>> aggregateData.setSimilarityId((String) data[0]);
>>> aggregateData.setMessage((String) data[1]);
>>> aggregateData.setUuid((String) data[2]);
>>> aggregateData.setTimestamp((Long) data[3]);
>>> if (!tempMap.containsKey(aggregateData.getUuid())) {
>>> tempMap.put(aggregateData.getUuid(), new ArrayList());
>>> }
>>> tempMap.get(aggregateData.getUuid()).add(aggregateData);
>>> }
>>>
>>> Thanks
>>> /Tishan
>>>
>>> On Fri, Jun 3, 2016 at 11:47 AM, Liangfei.Su 
>>> wrote:
>>>
 +Garrett

 Understand the output based on each input data. The question here is,
 the count is always 1, i'm expecting count=1/2/3. e.g. when second input
 message send in, the first input message should still kept in the windows,
 so expect there should be two groups...





 On Fri, Jun 3, 2016 at 11:14 AM, Charini Nanayakkara  wrote:

> Hi Liangfei.Su,
>
> This is the expected behavior of external time window. Since it's a
> moving time window, you get an output for each incoming event. Due to the
> presence of the group by keyword, it would give an aggregate output by
> considering the events with the same uuid and similarityId, which arrived
> within last two minutes.
>
> If you need only one output per group use the external time batch
> window instead. Ensure that all the events come within the time duration
> which you specify here.
>
> Regards,
> Charini
>
> On Fri, Jun 3, 2016 at 6:16 AM, Liangfei.Su 
> wrote:
>
>> +WSO2 @DL
>>
>> Could someone eligible to explain?
>>
>> https://github.com/wso2/siddhi/issues/154
>>
>>
>> Thanks,
>> Ralph
>>
>>
>> On Fri, Jun 3, 2016 at 8:21 AM, Garrett 
>> wrote:
>>
>>> Has problems with Siddhi group by with a time window, I am able to
>>> get the correct result aggregations for the group, but I receive one
>>> aggregated result per event, not one per group.
>>>
>>> Here is the query:
>>> define stream bootCorrelationStream (logLevel string, message
>>> string, similarityId string, timestamp long, uuid string); @info(name =
>>> 'bootCorrelation') from
>>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 
>>> 2
>>> min) select similarityId, adanos:first(message) as message, 
>>> min(timestamp)
>>> as startTime, max(timestamp) as endTime group by uuid, similarityId 
>>> insert
>>> into tempStream;
>>>
>>> Here is the output:
>>> Count: 1; message=ERROR first message, timestamp=2016-05-21
>>> 01:22:07.579
>>> Count: 1; message=ERROR second message, timestamp=2016-05-21
>>> 01:22:08.314
>>> Count: 1; message=ERROR third message, timestamp=2016-05-21
>>> 01:22:15.017
>>> Count: 1; message=ERROR fourth message, timestamp=2016-05-21
>>> 01:25:07.017
>>>
>>> Here is the major code:
>>> register callback
>>> `
>>> this.siddhiRuntime = new SiddhiRuntimeHolder();
>>> this.siddhiRuntime.siddhiManager = new SiddhiManager();
>>> ExecutionPlanRuntime executionPlanRuntime =
>>> this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
>>> this.siddhiRuntime.inputHandler =
>>> executionPlanRuntime.getInputHandler(this.getStreamName());
>>>
>>> final SiddhiBolt siddhiBolt = this;
>>> this.siddhiRuntime.queryCallback = new QueryCallback() {

Re: [Dev] [wso2/siddhi] Has problems with Siddhi group by with a time window (#154)

2016-06-03 Thread Tishan Dahanayakage
Yeah. So if that block is inside your siddhiBolt.queryCallback, Then each
time you receive an event you will create a new Map(tempMap). Hence your
count will always be 1. For your requirement you have to use external time
batch window as Charini mentioned earlier.

/Tishan

On Fri, Jun 3, 2016 at 3:21 PM, Liangfei.Su  wrote:

> Garrett might provide more details.
>
> There is format issue in github. Follow the code, basically, the output
> try to read the event data into Map of instanceUUid -> List different similarity> , then print the count of each instanceUuid.
>
> See
>
> String message = String.format("Count: %s; message=%s, timestamp=%s, 
> similarityId=%s,
>
>
>
>
>
> On Fri, Jun 3, 2016 at 2:59 PM, Tishan Dahanayakage 
> wrote:
>
>> Ralph,
>>
>> Does following code recide inside your siddhiBolt.queryCallback?
>>
>> Map> tempMap = new HashMap>();
>> for (Event event : inEvents) {
>> Object[] data = event.getData();
>> BootCorrelationAggregationData aggregateData = new
>> BootCorrelationAggregationData();
>> aggregateData.setSimilarityId((String) data[0]);
>> aggregateData.setMessage((String) data[1]);
>> aggregateData.setUuid((String) data[2]);
>> aggregateData.setTimestamp((Long) data[3]);
>> if (!tempMap.containsKey(aggregateData.getUuid())) {
>> tempMap.put(aggregateData.getUuid(), new ArrayList());
>> }
>> tempMap.get(aggregateData.getUuid()).add(aggregateData);
>> }
>>
>> Thanks
>> /Tishan
>>
>> On Fri, Jun 3, 2016 at 11:47 AM, Liangfei.Su 
>> wrote:
>>
>>> +Garrett
>>>
>>> Understand the output based on each input data. The question here is,
>>> the count is always 1, i'm expecting count=1/2/3. e.g. when second input
>>> message send in, the first input message should still kept in the windows,
>>> so expect there should be two groups...
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jun 3, 2016 at 11:14 AM, Charini Nanayakkara 
>>> wrote:
>>>
 Hi Liangfei.Su,

 This is the expected behavior of external time window. Since it's a
 moving time window, you get an output for each incoming event. Due to the
 presence of the group by keyword, it would give an aggregate output by
 considering the events with the same uuid and similarityId, which arrived
 within last two minutes.

 If you need only one output per group use the external time batch
 window instead. Ensure that all the events come within the time duration
 which you specify here.

 Regards,
 Charini

 On Fri, Jun 3, 2016 at 6:16 AM, Liangfei.Su 
 wrote:

> +WSO2 @DL
>
> Could someone eligible to explain?
>
> https://github.com/wso2/siddhi/issues/154
>
>
> Thanks,
> Ralph
>
>
> On Fri, Jun 3, 2016 at 8:21 AM, Garrett 
> wrote:
>
>> Has problems with Siddhi group by with a time window, I am able to
>> get the correct result aggregations for the group, but I receive one
>> aggregated result per event, not one per group.
>>
>> Here is the query:
>> define stream bootCorrelationStream (logLevel string, message string,
>> similarityId string, timestamp long, uuid string); @info(name =
>> 'bootCorrelation') from
>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
>> min) select similarityId, adanos:first(message) as message, 
>> min(timestamp)
>> as startTime, max(timestamp) as endTime group by uuid, similarityId 
>> insert
>> into tempStream;
>>
>> Here is the output:
>> Count: 1; message=ERROR first message, timestamp=2016-05-21
>> 01:22:07.579
>> Count: 1; message=ERROR second message, timestamp=2016-05-21
>> 01:22:08.314
>> Count: 1; message=ERROR third message, timestamp=2016-05-21
>> 01:22:15.017
>> Count: 1; message=ERROR fourth message, timestamp=2016-05-21
>> 01:25:07.017
>>
>> Here is the major code:
>> register callback
>> `
>> this.siddhiRuntime = new SiddhiRuntimeHolder();
>> this.siddhiRuntime.siddhiManager = new SiddhiManager();
>> ExecutionPlanRuntime executionPlanRuntime =
>> this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
>> this.siddhiRuntime.inputHandler =
>> executionPlanRuntime.getInputHandler(this.getStreamName());
>>
>> final SiddhiBolt siddhiBolt = this;
>> this.siddhiRuntime.queryCallback = new QueryCallback() {
>> @Override
>> public void receive(long timeStamp, 
>> org.wso2.siddhi.core.event.Event[] inEvents,
>> org.wso2.siddhi.core.event.Event[] removeEvents) {
>> siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents);
>> }
>> };
>> executionPlanRuntime.addCallback("query", 
>> this.siddhiRuntime.queryCallback);
>> executionPlanRuntime.start();
>>

Re: [Dev] [wso2/siddhi] Has problems with Siddhi group by with a time window (#154)

2016-06-03 Thread Liangfei.Su
Garrett might provide more details.

There is format issue in github. Follow the code, basically, the output try
to read the event data into Map of instanceUUid -> List , then print the count of each instanceUuid.

See

String message = String.format("Count: %s; message=%s, timestamp=%s,
similarityId=%s,





On Fri, Jun 3, 2016 at 2:59 PM, Tishan Dahanayakage  wrote:

> Ralph,
>
> Does following code recide inside your siddhiBolt.queryCallback?
>
> Map> tempMap = new HashMap>();
> for (Event event : inEvents) {
> Object[] data = event.getData();
> BootCorrelationAggregationData aggregateData = new
> BootCorrelationAggregationData();
> aggregateData.setSimilarityId((String) data[0]);
> aggregateData.setMessage((String) data[1]);
> aggregateData.setUuid((String) data[2]);
> aggregateData.setTimestamp((Long) data[3]);
> if (!tempMap.containsKey(aggregateData.getUuid())) {
> tempMap.put(aggregateData.getUuid(), new ArrayList());
> }
> tempMap.get(aggregateData.getUuid()).add(aggregateData);
> }
>
> Thanks
> /Tishan
>
> On Fri, Jun 3, 2016 at 11:47 AM, Liangfei.Su  wrote:
>
>> +Garrett
>>
>> Understand the output based on each input data. The question here is, the
>> count is always 1, i'm expecting count=1/2/3. e.g. when second input
>> message send in, the first input message should still kept in the windows,
>> so expect there should be two groups...
>>
>>
>>
>>
>>
>> On Fri, Jun 3, 2016 at 11:14 AM, Charini Nanayakkara 
>> wrote:
>>
>>> Hi Liangfei.Su,
>>>
>>> This is the expected behavior of external time window. Since it's a
>>> moving time window, you get an output for each incoming event. Due to the
>>> presence of the group by keyword, it would give an aggregate output by
>>> considering the events with the same uuid and similarityId, which arrived
>>> within last two minutes.
>>>
>>> If you need only one output per group use the external time batch window
>>> instead. Ensure that all the events come within the time duration which you
>>> specify here.
>>>
>>> Regards,
>>> Charini
>>>
>>> On Fri, Jun 3, 2016 at 6:16 AM, Liangfei.Su 
>>> wrote:
>>>
 +WSO2 @DL

 Could someone eligible to explain?

 https://github.com/wso2/siddhi/issues/154


 Thanks,
 Ralph


 On Fri, Jun 3, 2016 at 8:21 AM, Garrett 
 wrote:

> Has problems with Siddhi group by with a time window, I am able to get
> the correct result aggregations for the group, but I receive one 
> aggregated
> result per event, not one per group.
>
> Here is the query:
> define stream bootCorrelationStream (logLevel string, message string,
> similarityId string, timestamp long, uuid string); @info(name =
> 'bootCorrelation') from
> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
> min) select similarityId, adanos:first(message) as message, min(timestamp)
> as startTime, max(timestamp) as endTime group by uuid, similarityId insert
> into tempStream;
>
> Here is the output:
> Count: 1; message=ERROR first message, timestamp=2016-05-21
> 01:22:07.579
> Count: 1; message=ERROR second message, timestamp=2016-05-21
> 01:22:08.314
> Count: 1; message=ERROR third message, timestamp=2016-05-21
> 01:22:15.017
> Count: 1; message=ERROR fourth message, timestamp=2016-05-21
> 01:25:07.017
>
> Here is the major code:
> register callback
> `
> this.siddhiRuntime = new SiddhiRuntimeHolder();
> this.siddhiRuntime.siddhiManager = new SiddhiManager();
> ExecutionPlanRuntime executionPlanRuntime =
> this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
> this.siddhiRuntime.inputHandler =
> executionPlanRuntime.getInputHandler(this.getStreamName());
>
> final SiddhiBolt siddhiBolt = this;
> this.siddhiRuntime.queryCallback = new QueryCallback() {
> @Override
> public void receive(long timeStamp, 
> org.wso2.siddhi.core.event.Event[] inEvents,
> org.wso2.siddhi.core.event.Event[] removeEvents) {
> siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents);
> }
> };
> executionPlanRuntime.addCallback("query", 
> this.siddhiRuntime.queryCallback);
> executionPlanRuntime.start();
>
>
> query details
>
> public static String generateExecutionPlan() {
> // make sure the fields' name sorted
> StringBuilder executionPlane = new StringBuilder(" define stream
> bootCorrelationStream ");
> executionPlane.append("(logLevel string, message string, similarityId
> string, timestamp long, uuid string); ");
> executionPlane.append(" @info(name = 'bootCorrelation') ");
> // externalTimeBatch(timestamp, 5 min), batch time window for
> specified timestamp
> 

Re: [Dev] [wso2/siddhi] Has problems with Siddhi group by with a time window (#154)

2016-06-03 Thread Tishan Dahanayakage
Ralph,

Does following code recide inside your siddhiBolt.queryCallback?

Map> tempMap = new HashMap>();
for (Event event : inEvents) {
Object[] data = event.getData();
BootCorrelationAggregationData aggregateData = new
BootCorrelationAggregationData();
aggregateData.setSimilarityId((String) data[0]);
aggregateData.setMessage((String) data[1]);
aggregateData.setUuid((String) data[2]);
aggregateData.setTimestamp((Long) data[3]);
if (!tempMap.containsKey(aggregateData.getUuid())) {
tempMap.put(aggregateData.getUuid(), new ArrayList());
}
tempMap.get(aggregateData.getUuid()).add(aggregateData);
}

Thanks
/Tishan

On Fri, Jun 3, 2016 at 11:47 AM, Liangfei.Su  wrote:

> +Garrett
>
> Understand the output based on each input data. The question here is, the
> count is always 1, i'm expecting count=1/2/3. e.g. when second input
> message send in, the first input message should still kept in the windows,
> so expect there should be two groups...
>
>
>
>
>
> On Fri, Jun 3, 2016 at 11:14 AM, Charini Nanayakkara 
> wrote:
>
>> Hi Liangfei.Su,
>>
>> This is the expected behavior of external time window. Since it's a
>> moving time window, you get an output for each incoming event. Due to the
>> presence of the group by keyword, it would give an aggregate output by
>> considering the events with the same uuid and similarityId, which arrived
>> within last two minutes.
>>
>> If you need only one output per group use the external time batch window
>> instead. Ensure that all the events come within the time duration which you
>> specify here.
>>
>> Regards,
>> Charini
>>
>> On Fri, Jun 3, 2016 at 6:16 AM, Liangfei.Su  wrote:
>>
>>> +WSO2 @DL
>>>
>>> Could someone eligible to explain?
>>>
>>> https://github.com/wso2/siddhi/issues/154
>>>
>>>
>>> Thanks,
>>> Ralph
>>>
>>>
>>> On Fri, Jun 3, 2016 at 8:21 AM, Garrett 
>>> wrote:
>>>
 Has problems with Siddhi group by with a time window, I am able to get
 the correct result aggregations for the group, but I receive one aggregated
 result per event, not one per group.

 Here is the query:
 define stream bootCorrelationStream (logLevel string, message string,
 similarityId string, timestamp long, uuid string); @info(name =
 'bootCorrelation') from
 bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
 min) select similarityId, adanos:first(message) as message, min(timestamp)
 as startTime, max(timestamp) as endTime group by uuid, similarityId insert
 into tempStream;

 Here is the output:
 Count: 1; message=ERROR first message, timestamp=2016-05-21 01:22:07.579
 Count: 1; message=ERROR second message, timestamp=2016-05-21
 01:22:08.314
 Count: 1; message=ERROR third message, timestamp=2016-05-21 01:22:15.017
 Count: 1; message=ERROR fourth message, timestamp=2016-05-21
 01:25:07.017

 Here is the major code:
 register callback
 `
 this.siddhiRuntime = new SiddhiRuntimeHolder();
 this.siddhiRuntime.siddhiManager = new SiddhiManager();
 ExecutionPlanRuntime executionPlanRuntime =
 this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
 this.siddhiRuntime.inputHandler =
 executionPlanRuntime.getInputHandler(this.getStreamName());

 final SiddhiBolt siddhiBolt = this;
 this.siddhiRuntime.queryCallback = new QueryCallback() {
 @Override
 public void receive(long timeStamp, 
 org.wso2.siddhi.core.event.Event[] inEvents,
 org.wso2.siddhi.core.event.Event[] removeEvents) {
 siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents);
 }
 };
 executionPlanRuntime.addCallback("query", 
 this.siddhiRuntime.queryCallback);
 executionPlanRuntime.start();


 query details

 public static String generateExecutionPlan() {
 // make sure the fields' name sorted
 StringBuilder executionPlane = new StringBuilder(" define stream
 bootCorrelationStream ");
 executionPlane.append("(logLevel string, message string, similarityId
 string, timestamp long, uuid string); ");
 executionPlane.append(" @info(name = 'bootCorrelation') ");
 // externalTimeBatch(timestamp, 5 min), batch time window for specified
 timestamp
 executionPlane.append("from
 bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
 min) ");
 executionPlane.append(" select similarityId, adanos:first(message) as
 message, uuid, ");
 executionPlane.append(" min(timestamp) as startTime, max(timestamp) as
 endTime ");
 executionPlane.append(" group by uuid, similarityId ");
 executionPlane.append(" insert into tempStream; ");
 appendLogs(executionPlane.toString());
 return executionPlane.toString();
 }

 callback details

 Map> 

Re: [Dev] [wso2/siddhi] Has problems with Siddhi group by with a time window (#154)

2016-06-03 Thread Liangfei.Su
+Garrett

Understand the output based on each input data. The question here is, the
count is always 1, i'm expecting count=1/2/3. e.g. when second input
message send in, the first input message should still kept in the windows,
so expect there should be two groups...





On Fri, Jun 3, 2016 at 11:14 AM, Charini Nanayakkara 
wrote:

> Hi Liangfei.Su,
>
> This is the expected behavior of external time window. Since it's a moving
> time window, you get an output for each incoming event. Due to the presence
> of the group by keyword, it would give an aggregate output by considering
> the events with the same uuid and similarityId, which arrived within last
> two minutes.
>
> If you need only one output per group use the external time batch window
> instead. Ensure that all the events come within the time duration which you
> specify here.
>
> Regards,
> Charini
>
> On Fri, Jun 3, 2016 at 6:16 AM, Liangfei.Su  wrote:
>
>> +WSO2 @DL
>>
>> Could someone eligible to explain?
>>
>> https://github.com/wso2/siddhi/issues/154
>>
>>
>> Thanks,
>> Ralph
>>
>>
>> On Fri, Jun 3, 2016 at 8:21 AM, Garrett  wrote:
>>
>>> Has problems with Siddhi group by with a time window, I am able to get
>>> the correct result aggregations for the group, but I receive one aggregated
>>> result per event, not one per group.
>>>
>>> Here is the query:
>>> define stream bootCorrelationStream (logLevel string, message string,
>>> similarityId string, timestamp long, uuid string); @info(name =
>>> 'bootCorrelation') from
>>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
>>> min) select similarityId, adanos:first(message) as message, min(timestamp)
>>> as startTime, max(timestamp) as endTime group by uuid, similarityId insert
>>> into tempStream;
>>>
>>> Here is the output:
>>> Count: 1; message=ERROR first message, timestamp=2016-05-21 01:22:07.579
>>> Count: 1; message=ERROR second message, timestamp=2016-05-21 01:22:08.314
>>> Count: 1; message=ERROR third message, timestamp=2016-05-21 01:22:15.017
>>> Count: 1; message=ERROR fourth message, timestamp=2016-05-21 01:25:07.017
>>>
>>> Here is the major code:
>>> register callback
>>> `
>>> this.siddhiRuntime = new SiddhiRuntimeHolder();
>>> this.siddhiRuntime.siddhiManager = new SiddhiManager();
>>> ExecutionPlanRuntime executionPlanRuntime =
>>> this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
>>> this.siddhiRuntime.inputHandler =
>>> executionPlanRuntime.getInputHandler(this.getStreamName());
>>>
>>> final SiddhiBolt siddhiBolt = this;
>>> this.siddhiRuntime.queryCallback = new QueryCallback() {
>>> @Override
>>> public void receive(long timeStamp, 
>>> org.wso2.siddhi.core.event.Event[] inEvents,
>>> org.wso2.siddhi.core.event.Event[] removeEvents) {
>>> siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents);
>>> }
>>> };
>>> executionPlanRuntime.addCallback("query", 
>>> this.siddhiRuntime.queryCallback);
>>> executionPlanRuntime.start();
>>>
>>>
>>> query details
>>>
>>> public static String generateExecutionPlan() {
>>> // make sure the fields' name sorted
>>> StringBuilder executionPlane = new StringBuilder(" define stream
>>> bootCorrelationStream ");
>>> executionPlane.append("(logLevel string, message string, similarityId
>>> string, timestamp long, uuid string); ");
>>> executionPlane.append(" @info(name = 'bootCorrelation') ");
>>> // externalTimeBatch(timestamp, 5 min), batch time window for specified
>>> timestamp
>>> executionPlane.append("from
>>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
>>> min) ");
>>> executionPlane.append(" select similarityId, adanos:first(message) as
>>> message, uuid, ");
>>> executionPlane.append(" min(timestamp) as startTime, max(timestamp) as
>>> endTime ");
>>> executionPlane.append(" group by uuid, similarityId ");
>>> executionPlane.append(" insert into tempStream; ");
>>> appendLogs(executionPlane.toString());
>>> return executionPlane.toString();
>>> }
>>>
>>> callback details
>>>
>>> Map> tempMap = new HashMap>();
>>> for (Event event : inEvents) {
>>> Object[] data = event.getData();
>>> BootCorrelationAggregationData aggregateData = new
>>> BootCorrelationAggregationData();
>>> aggregateData.setSimilarityId((String) data[0]);
>>> aggregateData.setMessage((String) data[1]);
>>> aggregateData.setUuid((String) data[2]);
>>> aggregateData.setTimestamp((Long) data[3]);
>>> if (!tempMap.containsKey(aggregateData.getUuid())) {
>>> tempMap.put(aggregateData.getUuid(), new ArrayList());
>>> }
>>> tempMap.get(aggregateData.getUuid()).add(aggregateData);
>>> }
>>>
>>> List emitList = new 
>>> ArrayList();
>>> Iterator> it = 
>>> tempMap.entrySet().iterator();
>>> while (it.hasNext()) {
>>> Entry entry = 
>>> it.next();
>>> List temp = entry.getValue();

Re: [Dev] [wso2/siddhi] Has problems with Siddhi group by with a time window (#154)

2016-06-02 Thread Charini Nanayakkara
Hi Liangfei.Su,

This is the expected behavior of external time window. Since it's a moving
time window, you get an output for each incoming event. Due to the presence
of the group by keyword, it would give an aggregate output by considering
the events with the same uuid and similarityId, which arrived within last
two minutes.

If you need only one output per group use the external time batch window
instead. Ensure that all the events come within the time duration which you
specify here.

Regards,
Charini

On Fri, Jun 3, 2016 at 6:16 AM, Liangfei.Su  wrote:

> +WSO2 @DL
>
> Could someone eligible to explain?
>
> https://github.com/wso2/siddhi/issues/154
>
>
> Thanks,
> Ralph
>
>
> On Fri, Jun 3, 2016 at 8:21 AM, Garrett  wrote:
>
>> Has problems with Siddhi group by with a time window, I am able to get
>> the correct result aggregations for the group, but I receive one aggregated
>> result per event, not one per group.
>>
>> Here is the query:
>> define stream bootCorrelationStream (logLevel string, message string,
>> similarityId string, timestamp long, uuid string); @info(name =
>> 'bootCorrelation') from
>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
>> min) select similarityId, adanos:first(message) as message, min(timestamp)
>> as startTime, max(timestamp) as endTime group by uuid, similarityId insert
>> into tempStream;
>>
>> Here is the output:
>> Count: 1; message=ERROR first message, timestamp=2016-05-21 01:22:07.579
>> Count: 1; message=ERROR second message, timestamp=2016-05-21 01:22:08.314
>> Count: 1; message=ERROR third message, timestamp=2016-05-21 01:22:15.017
>> Count: 1; message=ERROR fourth message, timestamp=2016-05-21 01:25:07.017
>>
>> Here is the major code:
>> register callback
>> `
>> this.siddhiRuntime = new SiddhiRuntimeHolder();
>> this.siddhiRuntime.siddhiManager = new SiddhiManager();
>> ExecutionPlanRuntime executionPlanRuntime =
>> this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
>> this.siddhiRuntime.inputHandler =
>> executionPlanRuntime.getInputHandler(this.getStreamName());
>>
>> final SiddhiBolt siddhiBolt = this;
>> this.siddhiRuntime.queryCallback = new QueryCallback() {
>> @Override
>> public void receive(long timeStamp, 
>> org.wso2.siddhi.core.event.Event[] inEvents,
>> org.wso2.siddhi.core.event.Event[] removeEvents) {
>> siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents);
>> }
>> };
>> executionPlanRuntime.addCallback("query", 
>> this.siddhiRuntime.queryCallback);
>> executionPlanRuntime.start();
>>
>>
>> query details
>>
>> public static String generateExecutionPlan() {
>> // make sure the fields' name sorted
>> StringBuilder executionPlane = new StringBuilder(" define stream
>> bootCorrelationStream ");
>> executionPlane.append("(logLevel string, message string, similarityId
>> string, timestamp long, uuid string); ");
>> executionPlane.append(" @info(name = 'bootCorrelation') ");
>> // externalTimeBatch(timestamp, 5 min), batch time window for specified
>> timestamp
>> executionPlane.append("from
>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
>> min) ");
>> executionPlane.append(" select similarityId, adanos:first(message) as
>> message, uuid, ");
>> executionPlane.append(" min(timestamp) as startTime, max(timestamp) as
>> endTime ");
>> executionPlane.append(" group by uuid, similarityId ");
>> executionPlane.append(" insert into tempStream; ");
>> appendLogs(executionPlane.toString());
>> return executionPlane.toString();
>> }
>>
>> callback details
>>
>> Map> tempMap = new HashMap>();
>> for (Event event : inEvents) {
>> Object[] data = event.getData();
>> BootCorrelationAggregationData aggregateData = new
>> BootCorrelationAggregationData();
>> aggregateData.setSimilarityId((String) data[0]);
>> aggregateData.setMessage((String) data[1]);
>> aggregateData.setUuid((String) data[2]);
>> aggregateData.setTimestamp((Long) data[3]);
>> if (!tempMap.containsKey(aggregateData.getUuid())) {
>> tempMap.put(aggregateData.getUuid(), new ArrayList());
>> }
>> tempMap.get(aggregateData.getUuid()).add(aggregateData);
>> }
>>
>> List emitList = new 
>> ArrayList();
>> Iterator> it = 
>> tempMap.entrySet().iterator();
>> while (it.hasNext()) {
>> Entry entry = 
>> it.next();
>> List temp = entry.getValue();
>> Collections.sort(temp, new 
>> Comparator() {
>> @Override
>> public int compare(BootCorrelationAggregationData o1, 
>> BootCorrelationAggregationData o2) {
>> return (int) (o1.getTimestamp() - o2.getTimestamp());
>> }
>> });
>>
>> if (temp.size() > 0) {
>> String message = String.format("Count: %s, for %s", temp.size(), 
>> ToStringBuilder.reflectionToString(temp.get(0)));
>> 

Re: [Dev] [wso2/siddhi] Has problems with Siddhi group by with a time window (#154)

2016-06-02 Thread Liangfei.Su
+WSO2 @DL

Could someone eligible to explain?

https://github.com/wso2/siddhi/issues/154


Thanks,
Ralph


On Fri, Jun 3, 2016 at 8:21 AM, Garrett  wrote:

> Has problems with Siddhi group by with a time window, I am able to get the
> correct result aggregations for the group, but I receive one aggregated
> result per event, not one per group.
>
> Here is the query:
> define stream bootCorrelationStream (logLevel string, message string,
> similarityId string, timestamp long, uuid string); @info(name =
> 'bootCorrelation') from
> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
> min) select similarityId, adanos:first(message) as message, min(timestamp)
> as startTime, max(timestamp) as endTime group by uuid, similarityId insert
> into tempStream;
>
> Here is the output:
> Count: 1; message=ERROR first message, timestamp=2016-05-21 01:22:07.579
> Count: 1; message=ERROR second message, timestamp=2016-05-21 01:22:08.314
> Count: 1; message=ERROR third message, timestamp=2016-05-21 01:22:15.017
> Count: 1; message=ERROR fourth message, timestamp=2016-05-21 01:25:07.017
>
> Here is the major code:
> register callback
> `
> this.siddhiRuntime = new SiddhiRuntimeHolder();
> this.siddhiRuntime.siddhiManager = new SiddhiManager();
> ExecutionPlanRuntime executionPlanRuntime =
> this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
> this.siddhiRuntime.inputHandler =
> executionPlanRuntime.getInputHandler(this.getStreamName());
>
> final SiddhiBolt siddhiBolt = this;
> this.siddhiRuntime.queryCallback = new QueryCallback() {
> @Override
> public void receive(long timeStamp, 
> org.wso2.siddhi.core.event.Event[] inEvents,
> org.wso2.siddhi.core.event.Event[] removeEvents) {
> siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents);
> }
> };
> executionPlanRuntime.addCallback("query", 
> this.siddhiRuntime.queryCallback);
> executionPlanRuntime.start();
>
>
> query details
>
> public static String generateExecutionPlan() {
> // make sure the fields' name sorted
> StringBuilder executionPlane = new StringBuilder(" define stream
> bootCorrelationStream ");
> executionPlane.append("(logLevel string, message string, similarityId
> string, timestamp long, uuid string); ");
> executionPlane.append(" @info(name = 'bootCorrelation') ");
> // externalTimeBatch(timestamp, 5 min), batch time window for specified
> timestamp
> executionPlane.append("from
> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
> min) ");
> executionPlane.append(" select similarityId, adanos:first(message) as
> message, uuid, ");
> executionPlane.append(" min(timestamp) as startTime, max(timestamp) as
> endTime ");
> executionPlane.append(" group by uuid, similarityId ");
> executionPlane.append(" insert into tempStream; ");
> appendLogs(executionPlane.toString());
> return executionPlane.toString();
> }
>
> callback details
>
> Map> tempMap = new HashMap>();
> for (Event event : inEvents) {
> Object[] data = event.getData();
> BootCorrelationAggregationData aggregateData = new
> BootCorrelationAggregationData();
> aggregateData.setSimilarityId((String) data[0]);
> aggregateData.setMessage((String) data[1]);
> aggregateData.setUuid((String) data[2]);
> aggregateData.setTimestamp((Long) data[3]);
> if (!tempMap.containsKey(aggregateData.getUuid())) {
> tempMap.put(aggregateData.getUuid(), new ArrayList());
> }
> tempMap.get(aggregateData.getUuid()).add(aggregateData);
> }
>
> List emitList = new 
> ArrayList();
> Iterator> it = 
> tempMap.entrySet().iterator();
> while (it.hasNext()) {
> Entry entry = it.next();
> List temp = entry.getValue();
> Collections.sort(temp, new 
> Comparator() {
> @Override
> public int compare(BootCorrelationAggregationData o1, 
> BootCorrelationAggregationData o2) {
> return (int) (o1.getTimestamp() - o2.getTimestamp());
> }
> });
>
> if (temp.size() > 0) {
> String message = String.format("Count: %s, for %s", temp.size(), 
> ToStringBuilder.reflectionToString(temp.get(0)));
> LOG.info(message);
> SimpleDateFormat sdf = new SimpleDateFormat("-MM-dd 
> HH:mm:ss.SSS");
> appendLogs(String.format("Count: %s; message=%s, timestamp=%s", 
> temp.size(), temp.get(0).getMessage(), sdf.format(new 
> Date(temp.get(0).getTimestamp();
> emitList.add(temp.get(0));
> }
> }
>
> if (emitList.size() > 0) {
> this.getOutputCollector().emit(new Values(emitList));
> LOG.info(String.format("BootCorrelationSiddhiBolt emit size: %s", 
> emitList.size()));
> for (BootCorrelationAggregationData data : emitList) {
> LOG.info(String.format("BootCorrelationSiddhiBolt emits %s", 
>