[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213850#comment-15213850 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-202250432 Great work @satishd . Merged into 1.x and master > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213849#comment-15213849 ] ASF GitHub Bot commented on STORM-676: -- Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/1263 > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213816#comment-15213816 ] ASF GitHub Bot commented on STORM-676: -- Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/1072 > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213345#comment-15213345 ] ASF GitHub Bot commented on STORM-676: -- GitHub user satishd opened a pull request: https://github.com/apache/storm/pull/1263 STORM-676: Trident implementation for sliding and tumbling windows for 1.x-branch This PR is cherry-picked form master PR https://github.com/apache/storm/pull/1072 You can merge this pull request into a Git repository by running: $ git pull https://github.com/satishd/storm STORM-676-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1263.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1263 commit 89c03b83e990aa7fbc732caf055e6bd09e2fc479 Author: Satish DugganaDate: 2016-03-23T06:50:21Z STORM-676 Upmerged and resolved conflicts commit 532bb79b30fdfee04c24d7ed04f63b65c4c44862 Author: Satish Duggana Date: 2016-03-13T04:55:21Z STORM-676 addressed review comments commit dd02bcfdcfe6f92fef055a933722a7b485c8e613 Author: Satish Duggana Date: 2016-03-15T08:48:30Z STORM-676 Addressed review comments on API aligning with core window API commit 3a96f20f09b3eaccc59d333a581c3b0c7d345ccc Author: Satish Duggana Date: 2016-03-23T06:05:37Z STORM-676 Addressed review comments from Arun commit b08d7eaf7099e4da74010501a189818ec11b00bc Author: Satish Duggana Date: 2016-03-23T13:33:55Z STORM-676 Addressed review comments commit 8c263c7c08a3ebd11a1d5df4996b7e12422cd721 Author: Satish Duggana Date: 2016-03-23T17:05:08Z STORM-676 Refactoring of WindowConfig APIs > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213346#comment-15213346 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-201992895 @harshach Raised https://github.com/apache/storm/pull/1263 on 1.x-branch. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213126#comment-15213126 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-201904903 @harshach Upmerged and refactored HBase put APIs with recent hbase version upgrade in the master branch. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213101#comment-15213101 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-201882512 @satishd "[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project storm-hbase: Compilation failure: Compilation failure: [ERROR] /Users/harsha/code/apache/storm/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java:[210,25] unreported exception java.io.IOException; must be caught or declared to be thrown [ERROR] /Users/harsha/code/apache/storm/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java:[229,25] unreported exception java.io.IOException; must be caught or declared to be thrown [ERROR] -> [Help 1]" > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213090#comment-15213090 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-201879790 @satishd can you open a PR against 1.x-branch as well. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211582#comment-15211582 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-201194400 +1 for merging this. If we are including this in 1.0, its better to update the trident api docs as well in 1.0. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211434#comment-15211434 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-201139676 @arunmahadevan looks like all comments are addressed. Can you please take a look. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15209695#comment-15209695 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57272190 --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java --- @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.storm.trident.windowing.WindowsStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * This class stores entries into hbase instance of the given configuration. + * + */ +public class HBaseWindowsStore implements WindowsStore { +private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class); +public static final String UTF_8 = "utf-8"; + +private final ThreadLocal threadLocalHtable; +private Queue htables = new ConcurrentLinkedQueue<>(); +private final byte[] family; +private final byte[] qualifier; + +public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) { +this.family = family; +this.qualifier = qualifier; + +threadLocalHtable = new ThreadLocal() { +@Override +protected HTable initialValue() { +try { +HTable hTable = new HTable(config, tableName); +htables.add(hTable); +return hTable; +} catch (IOException e) { +throw new RuntimeException(e); +} +} +}; + +} + +private HTable htable() { +return threadLocalHtable.get(); +} + +private byte[] effectiveKey(String key) { +try { +return key.getBytes(UTF_8); +} catch (UnsupportedEncodingException e) { +throw new RuntimeException(e); +} +} + +@Override +public Object get(String key) { +WindowsStore.Entry.nonNullCheckForKey(key); + +byte[] effectiveKey = effectiveKey(key); +Get get = new Get(effectiveKey); +Result result = null; +try { +result = htable().get(get); +} catch (IOException e) { +throw new RuntimeException(e); +} + +if(result.isEmpty()) { +return null; +} + +Kryo kryo = new Kryo(); +Input input = new Input(result.getValue(family, qualifier)); +Object resultObject = kryo.readClassAndObject(input); +return resultObject; + +} + +
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15209117#comment-15209117 ] ASF GitHub Bot commented on STORM-676: -- Github user ptgoetz commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57232035 --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java --- @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.storm.trident.windowing.WindowsStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * This class stores entries into hbase instance of the given configuration. + * + */ +public class HBaseWindowsStore implements WindowsStore { +private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class); +public static final String UTF_8 = "utf-8"; + +private final ThreadLocal threadLocalHtable; +private Queue htables = new ConcurrentLinkedQueue<>(); +private final byte[] family; +private final byte[] qualifier; + +public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) { +this.family = family; +this.qualifier = qualifier; + +threadLocalHtable = new ThreadLocal() { +@Override +protected HTable initialValue() { +try { +HTable hTable = new HTable(config, tableName); +htables.add(hTable); +return hTable; +} catch (IOException e) { +throw new RuntimeException(e); +} +} +}; + +} + +private HTable htable() { +return threadLocalHtable.get(); +} + +private byte[] effectiveKey(String key) { +try { +return key.getBytes(UTF_8); +} catch (UnsupportedEncodingException e) { +throw new RuntimeException(e); +} +} + +@Override +public Object get(String key) { +WindowsStore.Entry.nonNullCheckForKey(key); + +byte[] effectiveKey = effectiveKey(key); +Get get = new Get(effectiveKey); +Result result = null; +try { +result = htable().get(get); +} catch (IOException e) { +throw new RuntimeException(e); +} + +if(result.isEmpty()) { +return null; +} + +Kryo kryo = new Kryo(); +Input input = new Input(result.getValue(family, qualifier)); +Object resultObject = kryo.readClassAndObject(input); +return resultObject; + +} + +
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208416#comment-15208416 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-200349889 - [STORM-1651](https://issues.apache.org/jira/browse/STORM-1651) is created for adding event time based support in trident windowing. - [STORM-1652](https://issues.apache.org/jira/browse/STORM-1652) is created for adding api/design docs. Design doc content will be retrieved from the doc attached to [STORM-676](https://issues.apache.org/jira/browse/STORM-676) JIRA. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208396#comment-15208396 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57158337 --- Diff: storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java --- @@ -40,7 +42,7 @@ public Debug(String name) { @Override public boolean isKeep(TridentTuple tuple) { --- End diff -- I would like to keep these changes as it is useful to see the timestamp in Debug output. I do not think we need a separate PR for this minor change. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208276#comment-15208276 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-200303590 1. The event time based windows are not exposed via trident, can you file a follow up JIRA ? 2. Trident api docs needs to be updated with windowing apis. 3. Would be good to add the implementation details and content from the design doc pdf into a separate .md file under docs. I will do a final pass after the comments are addressed. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208269#comment-15208269 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143513 --- Diff: storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java --- @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import org.apache.storm.Config; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.Aggregator; +import org.apache.storm.trident.planner.ProcessorContext; +import org.apache.storm.trident.planner.TridentProcessor; +import org.apache.storm.trident.planner.processor.FreshCollector; +import org.apache.storm.trident.planner.processor.TridentContext; +import org.apache.storm.trident.spout.IBatchID; +import org.apache.storm.trident.tuple.ConsList; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.tuple.TridentTupleView; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +/** + * {@code TridentProcessor} implementation for windowing operations on trident stream. + * + */ +public class WindowTridentProcessor implements TridentProcessor { +private static final Logger LOG = LoggerFactory.getLogger(WindowTridentProcessor.class); + +public static final String TRIGGER_INPROCESS_PREFIX = "tip" + WindowsStore.KEY_SEPARATOR; +public static final String TRIGGER_PREFIX = "tr" + WindowsStore.KEY_SEPARATOR; +public static final String TRIGGER_COUNT_PREFIX = "tc" + WindowsStore.KEY_SEPARATOR; + +public static final String TRIGGER_FIELD_NAME = "_task_info"; +public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100l; --- End diff -- use capital 'L', with small 'l' it appears like 1001 > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208261#comment-15208261 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143240 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. + * + * @param windowCount represents tuples count of a window + * @param slideCount the number of tuples after which the window slides + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration} + * + * @param windowDuration represents tumbling window duration configuration + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return --- End diff -- `@return the new Stream` > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208263#comment-15208263 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143253 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. + * + * @param windowCount represents tuples count of a window + * @param slideCount the number of tuples after which the window slides + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration} + * + * @param windowDuration represents tumbling window duration configuration + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration} + * and completes a window at {@code windowDuration} + * + * @param windowDuration represents window duration configuration + * @param slidingInterval the time duration after which the window slides + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return --- End diff -- `@return the new Stream` > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208264#comment-15208264 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143259 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. + * + * @param windowCount represents tuples count of a window + * @param slideCount the number of tuples after which the window slides + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration} + * + * @param windowDuration represents tumbling window duration configuration + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration} + * and completes a window at {@code windowDuration} + * + * @param windowDuration represents window duration configuration + * @param slidingInterval the time duration after which the window slides + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, +WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(SlidingDurationWindow.of(windowDuration, slidingInterval), windowStoreFactory, inputFields, aggregator,
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208260#comment-15208260 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143236 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. + * + * @param windowCount represents tuples count of a window + * @param slideCount the number of tuples after which the window slides + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration} --- End diff -- of a window that tumbles > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208265#comment-15208265 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143268 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. + * + * @param windowCount represents tuples count of a window + * @param slideCount the number of tuples after which the window slides + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration} + * + * @param windowDuration represents tumbling window duration configuration + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration} + * and completes a window at {@code windowDuration} + * + * @param windowDuration represents window duration configuration + * @param slidingInterval the time duration after which the window slides + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, +WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(SlidingDurationWindow.of(windowDuration, slidingInterval), windowStoreFactory, inputFields, aggregator,
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208262#comment-15208262 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143245 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. + * + * @param windowCount represents tuples count of a window + * @param slideCount the number of tuples after which the window slides + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration} + * + * @param windowDuration represents tumbling window duration configuration + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration} --- End diff -- `@code slidingInterval` > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208266#comment-15208266 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143286 --- Diff: storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java --- @@ -40,7 +42,7 @@ public Debug(String name) { @Override public boolean isKeep(TridentTuple tuple) { --- End diff -- Since you replaced Debug with peek, these changes can be reverted. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208256#comment-15208256 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143182 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window --- End diff -- no -> number > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208257#comment-15208257 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143185 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return --- End diff -- `@return the new Stream` > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208255#comment-15208255 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143154 --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java --- @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.storm.trident.windowing.WindowsStore; +import org.apache.storm.trident.windowing.WindowsStoreFactory; + +import java.util.Map; + +/** + * --- End diff -- Add some comments or remove this. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208259#comment-15208259 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143217 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. + * + * @param windowCount represents tuples count of a window + * @param slideCount the number of tuples after which the window slides + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return --- End diff -- `@return the new Stream` > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15208258#comment-15208258 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57143211 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. --- End diff -- slides the window after slideCount > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15207935#comment-15207935 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57113443 --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java --- @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.storm.trident.windowing.WindowsStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * This class stores entries into hbase instance of the given configuration. + * + */ +public class HBaseWindowsStore implements WindowsStore { +private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class); +public static final String UTF_8 = "utf-8"; + +private final ThreadLocal threadLocalHtable; +private Queue htables = new ConcurrentLinkedQueue<>(); +private final byte[] family; +private final byte[] qualifier; + +public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) { +this.family = family; +this.qualifier = qualifier; + +threadLocalHtable = new ThreadLocal() { +@Override +protected HTable initialValue() { +try { +HTable hTable = new HTable(config, tableName); +htables.add(hTable); +return hTable; +} catch (IOException e) { +throw new RuntimeException(e); +} +} +}; + +} + +private HTable htable() { +return threadLocalHtable.get(); +} + +private byte[] effectiveKey(String key) { +try { +return key.getBytes(UTF_8); +} catch (UnsupportedEncodingException e) { +throw new RuntimeException(e); +} +} + +@Override +public Object get(String key) { +WindowsStore.Entry.nonNullCheckForKey(key); + +byte[] effectiveKey = effectiveKey(key); +Get get = new Get(effectiveKey); +Result result = null; +try { +result = htable().get(get); +} catch (IOException e) { +throw new RuntimeException(e); +} + +if(result.isEmpty()) { +return null; +} + +Kryo kryo = new Kryo(); +Input input = new Input(result.getValue(family, qualifier)); +Object resultObject = kryo.readClassAndObject(input); +return resultObject; + +} + +
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15207900#comment-15207900 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57112111 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -565,19 +578,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. + * + * @param windowCount represents tuples count of a window + * @param slideCount represents sliding count window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration} + * + * @param windowDuration represents tumbling window duration configuration + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration} + * and completes a window at {@code windowDuration} + * + * @param windowDuration represents window duration configuration + * @param slideDuration represents sliding duration configuration --- End diff -- I will add that. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15207898#comment-15207898 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57111959 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -565,19 +578,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. + * + * @param windowCount represents tuples count of a window + * @param slideCount represents sliding count window --- End diff -- I mentioned count here as it directly conveys that it is count. I would change javadoc as you suggested to be in sync with core API. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15207895#comment-15207895 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57111677 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java --- @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.SlidingCountWindow; +import org.apache.storm.trident.windowing.config.SlidingDurationWindow; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.trident.windowing.config.TumblingDurationWindow; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Sample application of trident windowing which uses inmemory store for storing tuples in window. + */ +public class TridentWindowingInmemoryStoreTopology { +private static final Logger LOG = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class); + +public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception { +FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), +new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), +new Values("how many apples can you eat"), new Values("to be or not to be the person")); +spout.setCycle(true); + +TridentTopology topology = new TridentTopology(); + +Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), +new Split(), new Fields("word")) +.window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) +//.aggregate(new CountAsAggregator(), new Fields("count")) +.each(new Fields("count"), new Debug()) +.each(new Fields("count"), new Echo(), new Fields("ct")) +.each(new Fields("ct"), new Debug()); --- End diff -- Peek API was not there when this code was written. I will use peek API here. :) > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15207894#comment-15207894 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57111526 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * + */ +public class TridentHBaseWindowingStoreTopology { +private static final Logger LOG = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class); + +public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception { +FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), +new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), +new Values("how many apples can you eat"), new Values("to be or not to be the person")); +spout.setCycle(true); + +TridentTopology topology = new TridentTopology(); + +Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), +new Split(), new Fields("word")) +.window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) +//.tumblingTimeWindow(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) +.each(new Fields("count"), new Debug()) +.each(new Fields("count"), new Echo(), new Fields("ct")); + +return topology.build(); +} + +public static class Split extends BaseFunction { +@Override +public void execute(TridentTuple tuple, TridentCollector collector) { +String sentence = tuple.getString(0); +for (String word : sentence.split(" ")) { +collector.emit(new Values(word)); +} +} +} + +static class Echo implements Function { + +@Override +public void execute(TridentTuple tuple, TridentCollector collector) { +LOG.info("##Echo.execute: " + tuple); +collector.emit(tuple.getValues()); +} + +@Override +public void prepare(Map conf, TridentOperationContext context) { + +
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15207870#comment-15207870 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r57110471 --- Diff: examples/storm-starter/pom.xml --- @@ -149,6 +152,11 @@ storm-hdfs ${project.version} + + org.apache.storm --- End diff -- This is storm-hbase module version but not hbase.version > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203452#comment-15203452 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769635 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * --- End diff -- This is not part of the license, you might want to remove this. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203454#comment-15203454 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769646 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -565,19 +578,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. + * + * @param windowCount represents tuples count of a window + * @param slideCount represents sliding count window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(SlidingCountWindow.of(windowCount, slideCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration} + * + * @param windowDuration represents tumbling window duration configuration + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingDurationWindow.of(windowDuration), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration} + * and completes a window at {@code windowDuration} + * + * @param windowDuration represents window duration configuration + * @param slideDuration represents sliding duration configuration --- End diff -- Make it in sync with core api - "@param slidingInterval the time duration after which the window slides" > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203456#comment-15203456 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769652 --- Diff: storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java --- @@ -40,7 +42,7 @@ public Debug(String name) { @Override public boolean isKeep(TridentTuple tuple) { -System.out.println(name + tuple.toString()); +System.out.println("<"+new Date()+"> "+name + tuple.toString()); --- End diff -- Instead of modifying Debug you could use the peek api where you can specify custom actions. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203453#comment-15203453 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769644 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -565,19 +578,155 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents no of tuples in the window + * @param windowStoreFactory intermediary tuple store for storing windowing tuples + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, + Fields inputFields, Aggregator aggregator, Fields functionFields) { +return window(TumblingCountWindow.of(windowCount), windowStoreFactory, inputFields, aggregator, functionFields); +} + +/** + * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples + * and slides the window with {@code slideCount}. + * + * @param windowCount represents tuples count of a window + * @param slideCount represents sliding count window --- End diff -- Make it in sync with core api - "@param slidingInterval the number of tuples after which the window slides" > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203450#comment-15203450 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769622 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java --- @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.SlidingCountWindow; +import org.apache.storm.trident.windowing.config.SlidingDurationWindow; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.trident.windowing.config.TumblingDurationWindow; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Sample application of trident windowing which uses inmemory store for storing tuples in window. + */ +public class TridentWindowingInmemoryStoreTopology { +private static final Logger LOG = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class); + +public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception { +FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), +new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), +new Values("how many apples can you eat"), new Values("to be or not to be the person")); +spout.setCycle(true); + +TridentTopology topology = new TridentTopology(); + +Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), +new Split(), new Fields("word")) +.window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) +//.aggregate(new CountAsAggregator(), new Fields("count")) +.each(new Fields("count"), new Debug()) +.each(new Fields("count"), new Echo(), new Fields("ct")) +.each(new Fields("ct"), new Debug()); + +return topology.build(); +} + +public static class Split extends BaseFunction { --- End diff -- Why not reuse org.apache.storm.trident.testing.Split ? > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 >
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203448#comment-15203448 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769608 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * + */ +public class TridentHBaseWindowingStoreTopology { +private static final Logger LOG = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class); + +public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception { +FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), +new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), +new Values("how many apples can you eat"), new Values("to be or not to be the person")); +spout.setCycle(true); + +TridentTopology topology = new TridentTopology(); + +Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), +new Split(), new Fields("word")) +.window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) +//.tumblingTimeWindow(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) +.each(new Fields("count"), new Debug()) +.each(new Fields("count"), new Echo(), new Fields("ct")); + +return topology.build(); +} + +public static class Split extends BaseFunction { +@Override +public void execute(TridentTuple tuple, TridentCollector collector) { +String sentence = tuple.getString(0); +for (String word : sentence.split(" ")) { +collector.emit(new Values(word)); +} +} +} + +static class Echo implements Function { + +@Override +public void execute(TridentTuple tuple, TridentCollector collector) { +LOG.info("##Echo.execute: " + tuple); +collector.emit(tuple.getValues()); +} + +@Override +public void prepare(Map conf, TridentOperationContext context) { +
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203449#comment-15203449 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769621 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java --- @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.SlidingCountWindow; +import org.apache.storm.trident.windowing.config.SlidingDurationWindow; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.trident.windowing.config.TumblingDurationWindow; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Sample application of trident windowing which uses inmemory store for storing tuples in window. + */ +public class TridentWindowingInmemoryStoreTopology { +private static final Logger LOG = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class); + +public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception { +FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), +new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), +new Values("how many apples can you eat"), new Values("to be or not to be the person")); +spout.setCycle(true); + +TridentTopology topology = new TridentTopology(); + +Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), +new Split(), new Fields("word")) +.window(windowConfig, windowStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) +//.aggregate(new CountAsAggregator(), new Fields("count")) +.each(new Fields("count"), new Debug()) +.each(new Fields("count"), new Echo(), new Fields("ct")) +.each(new Fields("ct"), new Debug()); --- End diff -- Why two echo and one debug? To see the values isnt it enough to just print it once ? You could also use the peek api here. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL:
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203451#comment-15203451 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769629 --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java --- @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.storm.trident.windowing.WindowsStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * This class stores entries into hbase instance of the given configuration. + * + */ +public class HBaseWindowsStore implements WindowsStore { +private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class); +public static final String UTF_8 = "utf-8"; + +private final ThreadLocal threadLocalHtable; +private Queue htables = new ConcurrentLinkedQueue<>(); +private final byte[] family; +private final byte[] qualifier; + +public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) { +this.family = family; +this.qualifier = qualifier; + +threadLocalHtable = new ThreadLocal() { +@Override +protected HTable initialValue() { +try { +HTable hTable = new HTable(config, tableName); +htables.add(hTable); +return hTable; +} catch (IOException e) { +throw new RuntimeException(e); +} +} +}; + +} + +private HTable htable() { +return threadLocalHtable.get(); +} + +private byte[] effectiveKey(String key) { +try { +return key.getBytes(UTF_8); +} catch (UnsupportedEncodingException e) { +throw new RuntimeException(e); +} +} + +@Override +public Object get(String key) { +WindowsStore.Entry.nonNullCheckForKey(key); + +byte[] effectiveKey = effectiveKey(key); +Get get = new Get(effectiveKey); +Result result = null; +try { +result = htable().get(get); +} catch (IOException e) { +throw new RuntimeException(e); +} + +if(result.isEmpty()) { +return null; +} + +Kryo kryo = new Kryo(); +Input input = new Input(result.getValue(family, qualifier)); +Object resultObject = kryo.readClassAndObject(input); +return resultObject; + +} +
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203442#comment-15203442 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769589 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * --- End diff -- May be explain what this topology does and the various state options (in-memory vs hbase etc) > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203439#comment-15203439 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769585 --- Diff: examples/storm-starter/pom.xml --- @@ -177,6 +185,36 @@ storm-redis ${project.version} + + org.apache.hbase --- End diff -- why is hbase-server dependency needed ? anyways storm-hbase includes hbase-server dependency so this could be removed. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203446#comment-15203446 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769598 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * + */ +public class TridentHBaseWindowingStoreTopology { +private static final Logger LOG = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class); + +public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception { +FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), +new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), +new Values("how many apples can you eat"), new Values("to be or not to be the person")); +spout.setCycle(true); + +TridentTopology topology = new TridentTopology(); + +Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), +new Split(), new Fields("word")) +.window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) +//.tumblingTimeWindow(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) +.each(new Fields("count"), new Debug()) +.each(new Fields("count"), new Echo(), new Fields("ct")); + +return topology.build(); +} + +public static class Split extends BaseFunction { --- End diff -- Why not reuse org.apache.storm.trident.testing.Split ? > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203444#comment-15203444 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769591 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * + */ +public class TridentHBaseWindowingStoreTopology { +private static final Logger LOG = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class); + +public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception { +FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), +new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), +new Values("how many apples can you eat"), new Values("to be or not to be the person")); +spout.setCycle(true); + +TridentTopology topology = new TridentTopology(); + +Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), +new Split(), new Fields("word")) +.window(TumblingCountWindow.of(1000), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) +//.tumblingTimeWindow(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), new Fields("count")) --- End diff -- Remove comment. If you want to illustrate time window its better to write a separate example. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15203437#comment-15203437 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56769580 --- Diff: examples/storm-starter/pom.xml --- @@ -31,10 +31,13 @@ storm-starter - UTF-8 - - provided +UTF-8 + +provided +0.98.4-hadoop2 +1.1.2 --- End diff -- hbase.version is defined twice > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15196873#comment-15196873 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-197175210 @harshach I haven't reviewed the code yet, plan to do that within a couple of days. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15195496#comment-15195496 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-196884101 +1. will wait for @arunmahadevan > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15194943#comment-15194943 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r56130662 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -565,19 +578,169 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents window tuples count + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingCountWindow(int windowCount, Fields inputFields, Aggregator aggregator, Fields functionFields) { --- End diff -- @arunmahadevan Changed API aligned with core windowing API. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193313#comment-15193313 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-196317380 @arunmahadevan What is the batch emit interval in your topology configuration? Each triggered result is emitted as part of the nearest batch completed. Trident runs in batches and the emitted trigger would be part of the latest completed batch. These details are mentioned in the design doc attached in STORM-676. That is the reason it may have emitted results at the nearest completed batches. Can you please share your configuration so that I can confirm the behavior? > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193155#comment-15193155 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-196279990 Haven't gone through the code in great detail, but when I ran a simple topology with a tumbling time window of 3 secs, - If the spout emits a batch and sleeps, the result of the window agregation is never emitted (ideally it should have emitted at 3 secs). - If the spout periodically emits batches (one batch every 5 sec), the time when the window output is generated always conincides with the time the spout emits the batches (i.e at 10, 15, 20 secs) whereas one would expect the result to be emitted at 6, 12, 15, 18 etc. Also only one result is emitted at time t = 10 secs, whereas two batches are complete at this time. Can you verify if this is the current behavior and if so fix it? > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193154#comment-15193154 ] ASF GitHub Bot commented on STORM-676: -- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55989920 --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java --- @@ -565,19 +578,169 @@ public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functi .aggregate(inputFields, agg, functionFields) .chainEnd(); } - + +/** + * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples. + * + * @param windowCount represents window tuples count + * @param inputFields projected fields for aggregator + * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream. + * @param functionFields fields of values to emit with aggregation. + * + * @return + */ +public Stream tumblingCountWindow(int windowCount, Fields inputFields, Aggregator aggregator, Fields functionFields) { --- End diff -- The various windowing configurations can be expressed with the `window(WindowConfig windowConfig,...)`. If we are adding wrappers over it, can you please maintain compatibility with the core apis ? i.e, ```java tumblingWindow(Count count) tumblingWindow(Duration duration) window(Count windowLength, Count slidingInterval) window(Duration windowLength, Duration slidingInterval) ``` > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192457#comment-15192457 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55941142 --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java --- @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.storm.trident.windowing.WindowsStore; +import org.apache.storm.trident.windowing.WindowsStoreFactory; + +import java.util.Map; + +public class HBaseWindowsStoreFactory implements WindowsStoreFactory { +private final Mapconfig; +private final String tableName; +private final byte[] family; +private final byte[] qualifier; + +public HBaseWindowsStoreFactory(Map config, String tableName, byte[] family, byte[] qualifier) { --- End diff -- @satishd isn't table name, cf are specific to Hbase. In that case a config method can take a map and do its validation and this validation will specific to HbaseWindowStoreFactory > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192456#comment-15192456 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55941127 --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java --- @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.storm.trident.windowing.WindowsStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * This class stores entries into hbase instance of the given configuration. + * + */ +public class HBaseWindowsStore implements WindowsStore { +private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class); +public static final String UTF_8 = "utf-8"; + +private final ThreadLocal threadLocalHtable; +private Queue htables = new ConcurrentLinkedQueue<>(); +private final byte[] family; +private final byte[] qualifier; + +public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) { +this.family = family; +this.qualifier = qualifier; + +threadLocalHtable = new ThreadLocal() { +@Override +protected HTable initialValue() { +try { +HTable hTable = new HTable(config, tableName); +htables.add(hTable); +return hTable; +} catch (IOException e) { +throw new RuntimeException(e); +} +} +}; + +} + +private HTable htable() { +return threadLocalHtable.get(); +} + +private byte[] effectiveKey(String key) { +try { +return key.getBytes(UTF_8); +} catch (UnsupportedEncodingException e) { +throw new RuntimeException(e); +} +} + +@Override +public Object get(String key) { +WindowsStore.Entry.nonNullCheckForKey(key); + +byte[] effectiveKey = effectiveKey(key); +Get get = new Get(effectiveKey); +Result result = null; +try { +result = htable().get(get); +} catch (IOException e) { +throw new RuntimeException(e); +} + +if(result.isEmpty()) { +return null; +} + +Kryo kryo = new Kryo(); +Input input = new Input(result.getValue(family, qualifier)); +Object resultObject = kryo.readClassAndObject(input); +return resultObject; + +} + +
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192230#comment-15192230 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55934087 --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java --- @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.storm.trident.windowing.WindowsStore; +import org.apache.storm.trident.windowing.WindowsStoreFactory; + +import java.util.Map; + +public class HBaseWindowsStoreFactory implements WindowsStoreFactory { +private final Mapconfig; +private final String tableName; +private final byte[] family; +private final byte[] qualifier; + +public HBaseWindowsStoreFactory(Map config, String tableName, byte[] family, byte[] qualifier) { --- End diff -- @harshach All these fields should be available for windows-store-factory and they are not optional, that is why they are passed with the constructor. We may enclose them in a Configuration class(with immutable fields) and pass it to the constructor like the way it is done for HBaseStateFactory(Options). > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192224#comment-15192224 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55933892 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java --- @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseAggregator; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.SlidingCountWindow; +import org.apache.storm.trident.windowing.config.SlidingDurationWindow; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.trident.windowing.config.TumblingDurationWindow; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** --- End diff -- Updated with missing java doc. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192153#comment-15192153 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55932588 --- Diff: storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java --- @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +/** + * InMemoryWindowsStoreFactory contains a single instance of {@code InMemoryWindowsStore} which will be used for + * storing tuples and triggers of the window and successfully emitted triggers can be removed from {@code StateUpdater}. + * + */ +public class InMemoryWindowsStoreFactory implements WindowsStoreFactory { --- End diff -- @harshach Each InMemoryWindowsStoreFactory instance will always have the same inmemoryStore instance. The same InMemoryWindowsStoreFactory instance is passed to WindowsStateUpdater which which removes successfully emitted triggers from the same inMemoryWindowsStore instance in WindowsStateUpdater#updateState(WindowsState, List, TridentCollector). Below code snippet is from Stream#window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields, boolean storeTuplesInStore) ``` java // when storeTuplesInStore is false then the given windowStoreFactory is only used to store triggers and // that store is passed to WindowStateUpdater to remove them after committing the batch. Stream stream = _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, fields, fields, new WindowTridentProcessor(windowConfig, _topology.getUniqueWindowId(), windowStoreFactory, inputFields, aggregator, storeTuplesInStore))); Stream effectiveStream = stream.project(functionFields); // create StateUpdater with the given windowStoreFactory to remove triggered aggregation results form store // when they are successfully processed. StateFactory stateFactory = new WindowsStateFactory(); StateUpdater stateUpdater = new WindowsStateUpdater(windowStoreFactory); stream.partitionPersist(stateFactory, new Fields(WindowTridentProcessor.TRIGGER_FIELD_NAME), stateUpdater, new Fields()); ``` Updated the java doc with the below details. ``` java /** * InMemoryWindowsStoreFactory contains a single instance of {@link InMemoryWindowsStore} which will be used for * storing tuples and triggers of the window. The same InMemoryWindowsStoreFactory instance is passed to {@link WindowsStateUpdater}, * which removes successfully emitted triggers from the same {@code inMemoryWindowsStore} instance in * {@link WindowsStateUpdater#updateState(WindowsState, List, TridentCollector)}. * */ public class InMemoryWindowsStoreFactory implements WindowsStoreFactory { ``` > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192124#comment-15192124 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55932046 --- Diff: storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java --- @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import com.google.common.collect.Lists; +import org.apache.storm.coordination.BatchOutputCollector; +import org.apache.storm.trident.operation.Aggregator; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.trident.windowing.strategy.WindowStrategy; +import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory; +import org.apache.storm.windowing.EvictionPolicy; +import org.apache.storm.windowing.TriggerPolicy; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.apache.storm.windowing.WindowManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Basic functionality to manage trident tuple events using {@code WindowManager} and {@code WindowsStore} for storing + * tuples and triggers related information. + * + */ +public abstract class AbstractTridentWindowManager implements ITridentWindowManager { +private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class); + +protected final WindowManager windowManager; +protected final Aggregator aggregator; +protected final BatchOutputCollector delegateCollector; +protected final String windowTaskId; +protected final WindowsStore windowStore; + +protected final Set activeBatches = new HashSet<>(); +protected final Queue pendingTriggers = new ConcurrentLinkedQueue<>(); +protected final AtomicInteger triggerId = new AtomicInteger(); +private final String windowTriggerCountId; +private final TriggerPolicy triggerPolicy; + +public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, +Aggregator aggregator, BatchOutputCollector delegateCollector) { +this.windowTaskId = windowTaskId; +this.windowStore = windowStore; +this.aggregator = aggregator; +this.delegateCollector = delegateCollector; + +windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId; + +windowManager = new WindowManager<>(new TridentWindowLifeCycleListener()); + +WindowStrategy windowStrategy = WindowStrategyFactory.create(windowConfig); +EvictionPolicy evictionPolicy = windowStrategy.getEvictionPolicy(); +windowManager.setEvictionPolicy(evictionPolicy); +triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy); +windowManager.setTriggerPolicy(triggerPolicy); +} + +@Override +public void prepare() { +preInitialize(); + +initialize(); + +postInitialize(); +} + +private void preInitialize() { +log.debug("Getting current trigger count for this component/task"); +// get trigger count value from store +Object result = windowStore.get(windowTriggerCountId); +Integer currentCount
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192113#comment-15192113 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55931828 --- Diff: storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java --- @@ -40,7 +42,7 @@ public Debug(String name) { @Override public boolean isKeep(TridentTuple tuple) { -System.out.println(name + tuple.toString()); +System.out.println("<"+new Date()+"> "+name + tuple.toString()); --- End diff -- @harshach If debug logs are always thrown out to system out then we can change this to use log without any backward compatibility issue. I was not sure whether debug logs are always thrown out to system.out and did not change the existing code. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192059#comment-15192059 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-195847392 Thanks @satishd . Sorry for taking long time to review. I've few minor nits. Can you please address those. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192058#comment-15192058 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55930607 --- Diff: storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java --- @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +import com.google.common.base.Preconditions; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; + +/** + * Store for storing window related entities like windowed tuples, triggers etc. + * + */ +public interface WindowsStore extends Serializable { + +/** + * This can be used as a separator while generating a key from sequence of strings. + */ +public static final String KEY_SEPARATOR = "|"; --- End diff -- Its good to have configure method Map config rather than do it via constructor. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192053#comment-15192053 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55930541 --- Diff: storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java --- @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.trident.windowing; + +/** + * InMemoryWindowsStoreFactory contains a single instance of {@code InMemoryWindowsStore} which will be used for + * storing tuples and triggers of the window and successfully emitted triggers can be removed from {@code StateUpdater}. + * + */ +public class InMemoryWindowsStoreFactory implements WindowsStoreFactory { --- End diff -- is this intend to be singleton ? > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192048#comment-15192048 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55930477 --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java --- @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.storm.trident.windowing.WindowsStore; +import org.apache.storm.trident.windowing.WindowsStoreFactory; + +import java.util.Map; + +public class HBaseWindowsStoreFactory implements WindowsStoreFactory { +private final Mapconfig; +private final String tableName; +private final byte[] family; +private final byte[] qualifier; + +public HBaseWindowsStoreFactory(Map config, String tableName, byte[] family, byte[] qualifier) { --- End diff -- It would be better if we do this via config method than constructor > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192046#comment-15192046 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55930467 --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java --- @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.storm.trident.windowing.WindowsStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * This class stores entries into hbase instance of the given configuration. + * + */ +public class HBaseWindowsStore implements WindowsStore { +private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class); +public static final String UTF_8 = "utf-8"; + +private final ThreadLocal threadLocalHtable; +private Queue htables = new ConcurrentLinkedQueue<>(); +private final byte[] family; +private final byte[] qualifier; + +public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) { +this.family = family; +this.qualifier = qualifier; + +threadLocalHtable = new ThreadLocal() { +@Override +protected HTable initialValue() { +try { +HTable hTable = new HTable(config, tableName); +htables.add(hTable); +return hTable; +} catch (IOException e) { +throw new RuntimeException(e); +} +} +}; + +} + +private HTable htable() { +return threadLocalHtable.get(); +} + +private byte[] effectiveKey(String key) { +try { +return key.getBytes(UTF_8); +} catch (UnsupportedEncodingException e) { +throw new RuntimeException(e); +} +} + +@Override +public Object get(String key) { +WindowsStore.Entry.nonNullCheckForKey(key); + +byte[] effectiveKey = effectiveKey(key); +Get get = new Get(effectiveKey); +Result result = null; +try { +result = htable().get(get); +} catch (IOException e) { +throw new RuntimeException(e); +} + +if(result.isEmpty()) { +return null; +} + +Kryo kryo = new Kryo(); +Input input = new Input(result.getValue(family, qualifier)); +Object resultObject = kryo.readClassAndObject(input); +return resultObject; + +} + +
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192042#comment-15192042 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55930433 --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java --- @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.hbase.trident.windowing; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.storm.trident.windowing.WindowsStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * This class stores entries into hbase instance of the given configuration. + * + */ +public class HBaseWindowsStore implements WindowsStore { +private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class); +public static final String UTF_8 = "utf-8"; + +private final ThreadLocal threadLocalHtable; +private Queue htables = new ConcurrentLinkedQueue<>(); +private final byte[] family; +private final byte[] qualifier; + +public HBaseWindowsStore(final Configuration config, final String tableName, byte[] family, byte[] qualifier) { --- End diff -- this looks like its better have its own config method and take a Map config object to configure it. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192041#comment-15192041 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55930422 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java --- @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseAggregator; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.SlidingCountWindow; +import org.apache.storm.trident.windowing.config.SlidingDurationWindow; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.trident.windowing.config.TumblingDurationWindow; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** --- End diff -- nit; please remove empty comments > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192024#comment-15192024 ] ASF GitHub Bot commented on STORM-676: -- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1072#discussion_r55930081 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java --- @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.starter.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseAggregator; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.Function; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.CountAsAggregator; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; +import org.apache.storm.trident.windowing.WindowsStoreFactory; +import org.apache.storm.trident.windowing.config.SlidingCountWindow; +import org.apache.storm.trident.windowing.config.SlidingDurationWindow; +import org.apache.storm.trident.windowing.config.TumblingCountWindow; +import org.apache.storm.trident.windowing.config.TumblingDurationWindow; +import org.apache.storm.trident.windowing.config.WindowConfig; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * + */ +public class TridentWindowingInmemoryStoreTopology { +private static final Logger log = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class); --- End diff -- in storm we are always using capital LOG. can you please make sure this reflected across patch. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows
[ https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15150592#comment-15150592 ] ASF GitHub Bot commented on STORM-676: -- Github user satishd commented on the pull request: https://github.com/apache/storm/pull/1072#issuecomment-185238635 Upmerged, resolved and rebased. > Storm Trident support for sliding/tumbling windows > -- > > Key: STORM-676 > URL: https://issues.apache.org/jira/browse/STORM-676 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Satish Duggana > Fix For: 1.0.0, 2.0.0 > > Attachments: StormTrident_windowing_support-676.pdf > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)