[GitHub] storm pull request: STORM-676: Trident implementation for sliding ...

2016-03-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/1263


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-28 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-202250432
  
Great work @satishd . Merged into 1.x and master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/1072


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-26 Thread satishd
Github user satishd commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-201992895
  
@harshach Raised https://github.com/apache/storm/pull/1263 on 1.x-branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676: Trident implementation for sliding ...

2016-03-26 Thread satishd
GitHub user satishd opened a pull request:

https://github.com/apache/storm/pull/1263

STORM-676:  Trident implementation for sliding and tumbling windows for 
1.x-branch

This PR is cherry-picked form master PR 
https://github.com/apache/storm/pull/1072

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/satishd/storm STORM-676-1.x

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1263.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1263


commit 89c03b83e990aa7fbc732caf055e6bd09e2fc479
Author: Satish Duggana 
Date:   2016-03-23T06:50:21Z

STORM-676 Upmerged and resolved conflicts

commit 532bb79b30fdfee04c24d7ed04f63b65c4c44862
Author: Satish Duggana 
Date:   2016-03-13T04:55:21Z

STORM-676 addressed review comments

commit dd02bcfdcfe6f92fef055a933722a7b485c8e613
Author: Satish Duggana 
Date:   2016-03-15T08:48:30Z

STORM-676 Addressed review comments on API aligning with core window API

commit 3a96f20f09b3eaccc59d333a581c3b0c7d345ccc
Author: Satish Duggana 
Date:   2016-03-23T06:05:37Z

STORM-676 Addressed review comments from Arun

commit b08d7eaf7099e4da74010501a189818ec11b00bc
Author: Satish Duggana 
Date:   2016-03-23T13:33:55Z

STORM-676 Addressed review comments

commit 8c263c7c08a3ebd11a1d5df4996b7e12422cd721
Author: Satish Duggana 
Date:   2016-03-23T17:05:08Z

STORM-676 Refactoring of WindowConfig APIs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-26 Thread satishd
Github user satishd commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-201904903
  
@harshach Upmerged and refactored HBase put APIs with recent hbase version 
upgrade in the master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-26 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-201882512
  
@satishd 
"[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on 
project storm-hbase: Compilation failure: Compilation failure:
[ERROR] 
/Users/harsha/code/apache/storm/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java:[210,25]
 unreported exception java.io.IOException; must be caught or declared to be 
thrown
[ERROR] 
/Users/harsha/code/apache/storm/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java:[229,25]
 unreported exception java.io.IOException; must be caught or declared to be 
thrown
[ERROR] -> [Help 1]"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-26 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-201879790
  
@satishd can you open a PR against 1.x-branch as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-25 Thread arunmahadevan
Github user arunmahadevan commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-201194400
  
+1 for merging this. If we are including this in 1.0, its better to update 
the trident api docs as well in 1.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-24 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-201139676
  
@arunmahadevan looks like all comments are addressed. Can you please take a 
look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57272190
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 ---
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given 
configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWindowsStore.class);
+public static final String UTF_8 = "utf-8";
+
+private final ThreadLocal threadLocalHtable;
+private Queue htables = new ConcurrentLinkedQueue<>();
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStore(final Configuration config, final String 
tableName, byte[] family, byte[] qualifier) {
+this.family = family;
+this.qualifier = qualifier;
+
+threadLocalHtable = new ThreadLocal() {
+@Override
+protected HTable initialValue() {
+try {
+HTable hTable = new HTable(config, tableName);
+htables.add(hTable);
+return hTable;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+};
+
+}
+
+private HTable htable() {
+return threadLocalHtable.get();
+}
+
+private byte[] effectiveKey(String key) {
+try {
+return key.getBytes(UTF_8);
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public Object get(String key) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] effectiveKey = effectiveKey(key);
+Get get = new Get(effectiveKey);
+Result result = null;
+try {
+result = htable().get(get);
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+
+if(result.isEmpty()) {
+return null;
+}
+
+Kryo kryo = new Kryo();
+Input input = new Input(result.getValue(family, qualifier));
+Object resultObject = kryo.readClassAndObject(input);
+return resultObject;
+
+}
+
+@Override
+public Iterable get(List keys) {
+List gets = new ArrayList<>();
+for (String key : keys) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] 

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread ptgoetz
Github user ptgoetz commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57232035
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 ---
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given 
configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWindowsStore.class);
+public static final String UTF_8 = "utf-8";
+
+private final ThreadLocal threadLocalHtable;
+private Queue htables = new ConcurrentLinkedQueue<>();
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStore(final Configuration config, final String 
tableName, byte[] family, byte[] qualifier) {
+this.family = family;
+this.qualifier = qualifier;
+
+threadLocalHtable = new ThreadLocal() {
+@Override
+protected HTable initialValue() {
+try {
+HTable hTable = new HTable(config, tableName);
+htables.add(hTable);
+return hTable;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+};
+
+}
+
+private HTable htable() {
+return threadLocalHtable.get();
+}
+
+private byte[] effectiveKey(String key) {
+try {
+return key.getBytes(UTF_8);
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public Object get(String key) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] effectiveKey = effectiveKey(key);
+Get get = new Get(effectiveKey);
+Result result = null;
+try {
+result = htable().get(get);
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+
+if(result.isEmpty()) {
+return null;
+}
+
+Kryo kryo = new Kryo();
+Input input = new Input(result.getValue(family, qualifier));
+Object resultObject = kryo.readClassAndObject(input);
+return resultObject;
+
+}
+
+@Override
+public Iterable get(List keys) {
+List gets = new ArrayList<>();
+for (String key : keys) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] 

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread satishd
Github user satishd commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-200349889
  
- [STORM-1651](https://issues.apache.org/jira/browse/STORM-1651) is created 
for adding event time based support in trident windowing.
- [STORM-1652](https://issues.apache.org/jira/browse/STORM-1652) is created 
for adding api/design docs. Design doc content will be retrieved from the doc 
attached to [STORM-676](https://issues.apache.org/jira/browse/STORM-676) JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57158337
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java ---
@@ -40,7 +42,7 @@ public Debug(String name) {
 
 @Override
 public boolean isKeep(TridentTuple tuple) {
--- End diff --

I would like to keep these changes as it is useful to see the timestamp in 
Debug output. I do not think we need a separate PR for this minor change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-200303590
  
1. The event time based windows are not exposed via trident, can you file a 
follow up JIRA ?
2. Trident api docs needs to be updated with windowing apis.
3. Would be good to add the implementation details and content from the 
design doc pdf into a separate .md file under docs.

I will do a final pass after the comments are addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143584
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
 ---
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.strategy;
+
+import org.apache.storm.trident.windowing.config.WindowConfig;
+
+/**
+ *
+ */
+public final class WindowStrategyFactory {
+
+private WindowStrategyFactory() {
+}
+
+/**
+ * Creates a {@code WindowStrategy} instance based on the given {@code 
windowConfig}.
+ *
+ * @param windowConfig
+ * @return
+ */
+public static  WindowStrategy create(WindowConfig windowConfig) {
+WindowStrategy windowStrategy = null;
+WindowConfig.Type windowType = windowConfig.getWindowType();
+switch(windowType) {
--- End diff --

instead of switching, you can add a getWindowStrategy method in 
WindowConfig and invoke windowConfig.getWindowStrategy(). That way you don't 
need to maintain these enum constants and add new constants if new window 
configs are added in future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143565
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java 
---
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing.config;
+
+import java.io.Serializable;
+
+/**
+ * Windowing configuration with window and sliding length.
+ */
+public interface WindowConfig extends Serializable {
+
+/**
+ * Returns the length of the window.
+ * @return
+ */
+public int getWindowLength();
+
+/**
+ * Returns the sliding length of the moving window.
+ * @return
+ */
+public int getSlidingLength();
+
+/**
+ * Gives the type of windowing. It can be any of {@code Type} values.
+ *
+ * @return
+ */
+public Type getWindowType();
+
+public void validate();
+
+public enum Type {
--- End diff --

The subclasses already capture the type of the WindowConfig so this enum 
can be avoided. Instead of switching on this type later to return the window 
strategy, you can add a method here that returns the window strategy and have 
the subclasses of WindowConfig return the appropriate WindowStrategy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143513
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
 ---
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.planner.ProcessorContext;
+import org.apache.storm.trident.planner.TridentProcessor;
+import org.apache.storm.trident.planner.processor.FreshCollector;
+import org.apache.storm.trident.planner.processor.TridentContext;
+import org.apache.storm.trident.spout.IBatchID;
+import org.apache.storm.trident.tuple.ConsList;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.tuple.TridentTupleView;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * {@code TridentProcessor} implementation for windowing operations on 
trident stream.
+ *
+ */
+public class WindowTridentProcessor implements TridentProcessor {
+private static final Logger LOG = 
LoggerFactory.getLogger(WindowTridentProcessor.class);
+
+public static final String TRIGGER_INPROCESS_PREFIX = "tip" + 
WindowsStore.KEY_SEPARATOR;
+public static final String TRIGGER_PREFIX = "tr" + 
WindowsStore.KEY_SEPARATOR;
+public static final String TRIGGER_COUNT_PREFIX = "tc" + 
WindowsStore.KEY_SEPARATOR;
+
+public static final String TRIGGER_FIELD_NAME = "_task_info";
+public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100l;
--- End diff --

use capital 'L', with small 'l' it appears like 1001


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143286
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java ---
@@ -40,7 +42,7 @@ public Debug(String name) {
 
 @Override
 public boolean isKeep(TridentTuple tuple) {
--- End diff --

Since you replaced Debug with peek, these changes can be reverted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143253
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
+  Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingCountWindow.of(windowCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount the number of tuples after which the window slides
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream slidingWindow(int windowCount, int slideCount, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(SlidingCountWindow.of(windowCount, slideCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
tumbles at duration of {@code windowDuration}
+ *
+ * @param windowDuration represents tumbling window duration 
configuration
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingDurationWindow.of(windowDuration), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
which slides at duration of {@code slideDuration}
+ * and completes a window at {@code windowDuration}
+ *
+ * @param windowDuration represents window duration configuration
+ * @param slidingInterval the time duration after which the window 
slides
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
--- End diff --

`@return the new Stream`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143259
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
+  Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingCountWindow.of(windowCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount the number of tuples after which the window slides
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream slidingWindow(int windowCount, int slideCount, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(SlidingCountWindow.of(windowCount, slideCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
tumbles at duration of {@code windowDuration}
+ *
+ * @param windowDuration represents tumbling window duration 
configuration
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingDurationWindow.of(windowDuration), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
which slides at duration of {@code slideDuration}
+ * and completes a window at {@code windowDuration}
+ *
+ * @param windowDuration represents window duration configuration
+ * @param slidingInterval the time duration after which the window 
slides
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, 
BaseWindowedBolt.Duration slidingInterval,
+WindowsStoreFactory 
windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields 
functionFields) {
+return window(SlidingDurationWindow.of(windowDuration, 
slidingInterval), windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of aggregated results based on the given window 
configuration which uses inmemory windowing tuple store.
+ *
+ * @param windowConfig window 

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143154
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
 ---
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+
+import java.util.Map;
+
+/**
+ *
--- End diff --

Add some comments or remove this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143240
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
+  Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingCountWindow.of(windowCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount the number of tuples after which the window slides
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream slidingWindow(int windowCount, int slideCount, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(SlidingCountWindow.of(windowCount, slideCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
tumbles at duration of {@code windowDuration}
+ *
+ * @param windowDuration represents tumbling window duration 
configuration
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
--- End diff --

`@return the new Stream`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143217
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
+  Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingCountWindow.of(windowCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount the number of tuples after which the window slides
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
--- End diff --

`@return the new Stream`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143245
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
+  Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingCountWindow.of(windowCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount the number of tuples after which the window slides
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream slidingWindow(int windowCount, int slideCount, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(SlidingCountWindow.of(windowCount, slideCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
tumbles at duration of {@code windowDuration}
+ *
+ * @param windowDuration represents tumbling window duration 
configuration
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingDurationWindow.of(windowDuration), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
which slides at duration of {@code slideDuration}
--- End diff --

`@code slidingInterval`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143185
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
--- End diff --

`@return the new Stream`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143236
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
+  Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingCountWindow.of(windowCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount the number of tuples after which the window slides
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream slidingWindow(int windowCount, int slideCount, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(SlidingCountWindow.of(windowCount, slideCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
tumbles at duration of {@code windowDuration}
--- End diff --

of a window that tumbles


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143182
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
--- End diff --

no -> number


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143268
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
+  Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingCountWindow.of(windowCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount the number of tuples after which the window slides
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream slidingWindow(int windowCount, int slideCount, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(SlidingCountWindow.of(windowCount, slideCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
tumbles at duration of {@code windowDuration}
+ *
+ * @param windowDuration represents tumbling window duration 
configuration
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingDurationWindow.of(windowDuration), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
which slides at duration of {@code slideDuration}
+ * and completes a window at {@code windowDuration}
+ *
+ * @param windowDuration represents window duration configuration
+ * @param slidingInterval the time duration after which the window 
slides
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, 
BaseWindowedBolt.Duration slidingInterval,
+WindowsStoreFactory 
windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields 
functionFields) {
+return window(SlidingDurationWindow.of(windowDuration, 
slidingInterval), windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of aggregated results based on the given window 
configuration which uses inmemory windowing tuple store.
+ *
+ * @param windowConfig window 

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57143132
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 ---
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given 
configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWindowsStore.class);
+public static final String UTF_8 = "utf-8";
+
+private final ThreadLocal threadLocalHtable;
+private Queue htables = new ConcurrentLinkedQueue<>();
--- End diff --

could be final ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-23 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57113443
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 ---
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given 
configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWindowsStore.class);
+public static final String UTF_8 = "utf-8";
+
+private final ThreadLocal threadLocalHtable;
+private Queue htables = new ConcurrentLinkedQueue<>();
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStore(final Configuration config, final String 
tableName, byte[] family, byte[] qualifier) {
+this.family = family;
+this.qualifier = qualifier;
+
+threadLocalHtable = new ThreadLocal() {
+@Override
+protected HTable initialValue() {
+try {
+HTable hTable = new HTable(config, tableName);
+htables.add(hTable);
+return hTable;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+};
+
+}
+
+private HTable htable() {
+return threadLocalHtable.get();
+}
+
+private byte[] effectiveKey(String key) {
+try {
+return key.getBytes(UTF_8);
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public Object get(String key) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] effectiveKey = effectiveKey(key);
+Get get = new Get(effectiveKey);
+Result result = null;
+try {
+result = htable().get(get);
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+
+if(result.isEmpty()) {
+return null;
+}
+
+Kryo kryo = new Kryo();
+Input input = new Input(result.getValue(family, qualifier));
+Object resultObject = kryo.readClassAndObject(input);
+return resultObject;
+
+}
+
+@Override
+public Iterable get(List keys) {
+List gets = new ArrayList<>();
+for (String key : keys) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] 

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-22 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57112111
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -565,19 +578,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
+  Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingCountWindow.of(windowCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount represents sliding count window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream slidingWindow(int windowCount, int slideCount, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(SlidingCountWindow.of(windowCount, slideCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
tumbles at duration of {@code windowDuration}
+ *
+ * @param windowDuration represents tumbling window duration 
configuration
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingDurationWindow.of(windowDuration), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
which slides at duration of {@code slideDuration}
+ * and completes a window at {@code windowDuration}
+ *
+ * @param windowDuration represents window duration configuration
+ * @param slideDuration represents sliding duration  configuration
--- End diff --

I will add that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-22 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57111959
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -565,19 +578,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
+  Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingCountWindow.of(windowCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount represents sliding count window
--- End diff --

I mentioned count here as it directly conveys that it is count. I would 
change javadoc as you suggested to be in sync with core API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-22 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57111677
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
 ---
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Sample application of trident windowing which uses inmemory store for 
storing tuples in window.
+ */
+public class TridentWindowingInmemoryStoreTopology {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
+
+public static StormTopology buildTopology(WindowsStoreFactory 
windowStore, WindowConfig windowConfig) throws Exception {
+FixedBatchSpout spout = new FixedBatchSpout(new 
Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+new Values("the man went to the store and bought some 
candy"), new Values("four score and seven years ago"),
+new Values("how many apples can you eat"), new Values("to 
be or not to be the person"));
+spout.setCycle(true);
+
+TridentTopology topology = new TridentTopology();
+
+Stream stream = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
+new Split(), new Fields("word"))
+.window(windowConfig, windowStore, new Fields("word"), new 
CountAsAggregator(), new Fields("count"))
+//.aggregate(new CountAsAggregator(), new Fields("count"))
+.each(new Fields("count"), new Debug())
+.each(new Fields("count"), new Echo(), new Fields("ct"))
+.each(new Fields("ct"), new Debug());
--- End diff --

Peek API was not there when this code was written. I will use peek API 
here. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-22 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57111526
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+public class TridentHBaseWindowingStoreTopology {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+
+public static StormTopology buildTopology(WindowsStoreFactory 
windowsStore) throws Exception {
+FixedBatchSpout spout = new FixedBatchSpout(new 
Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+new Values("the man went to the store and bought some 
candy"), new Values("four score and seven years ago"),
+new Values("how many apples can you eat"), new Values("to 
be or not to be the person"));
+spout.setCycle(true);
+
+TridentTopology topology = new TridentTopology();
+
+Stream stream = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
+new Split(), new Fields("word"))
+.window(TumblingCountWindow.of(1000), windowsStore, new 
Fields("word"), new CountAsAggregator(), new Fields("count"))
+//.tumblingTimeWindow(new BaseWindowedBolt.Duration(3, 
TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), 
new Fields("count"))
+.each(new Fields("count"), new Debug())
+.each(new Fields("count"), new Echo(), new Fields("ct"));
+
+return topology.build();
+}
+
+public static class Split extends BaseFunction {
+@Override
+public void execute(TridentTuple tuple, TridentCollector 
collector) {
+String sentence = tuple.getString(0);
+for (String word : sentence.split(" ")) {
+collector.emit(new Values(word));
+}
+}
+}
+
+static class Echo implements Function {
+
+@Override
+public void execute(TridentTuple tuple, TridentCollector 
collector) {
+LOG.info("##Echo.execute: " + tuple);
+collector.emit(tuple.getValues());
+}
+
+@Override
+public void prepare(Map conf, TridentOperationContext context) {
+
+}
+
+@Override
+public void cleanup() {
+
+}
+}
+
+public static void main(String[] args) throws Exception {
+Config conf = new Config();
+

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-22 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r57110471
  
--- Diff: examples/storm-starter/pom.xml ---
@@ -149,6 +152,11 @@
   storm-hdfs
   ${project.version}
 
+  
+  org.apache.storm
--- End diff --

This is storm-hbase module version but not hbase.version


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56769644
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -565,19 +578,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
+  Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingCountWindow.of(windowCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount represents sliding count window
--- End diff --

Make it in sync with core api - "@param slidingInterval the number of 
tuples after which the window slides"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56769652
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java ---
@@ -40,7 +42,7 @@ public Debug(String name) {
 
 @Override
 public boolean isKeep(TridentTuple tuple) {
-System.out.println(name + tuple.toString());
+System.out.println("<"+new Date()+"> "+name + tuple.toString());
--- End diff --

Instead of modifying Debug you could use the peek api where you can specify 
custom actions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56769635
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
--- End diff --

This is not part of the license, you might want to remove this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56769629
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 ---
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given 
configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWindowsStore.class);
+public static final String UTF_8 = "utf-8";
+
+private final ThreadLocal threadLocalHtable;
+private Queue htables = new ConcurrentLinkedQueue<>();
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStore(final Configuration config, final String 
tableName, byte[] family, byte[] qualifier) {
+this.family = family;
+this.qualifier = qualifier;
+
+threadLocalHtable = new ThreadLocal() {
+@Override
+protected HTable initialValue() {
+try {
+HTable hTable = new HTable(config, tableName);
+htables.add(hTable);
+return hTable;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+};
+
+}
+
+private HTable htable() {
+return threadLocalHtable.get();
+}
+
+private byte[] effectiveKey(String key) {
+try {
+return key.getBytes(UTF_8);
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public Object get(String key) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] effectiveKey = effectiveKey(key);
+Get get = new Get(effectiveKey);
+Result result = null;
+try {
+result = htable().get(get);
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+
+if(result.isEmpty()) {
+return null;
+}
+
+Kryo kryo = new Kryo();
+Input input = new Input(result.getValue(family, qualifier));
+Object resultObject = kryo.readClassAndObject(input);
+return resultObject;
+
+}
+
+@Override
+public Iterable get(List keys) {
+List gets = new ArrayList<>();
+for (String key : keys) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] 

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56769646
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -565,19 +578,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents no of tuples in the window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
+  Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingCountWindow.of(windowCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
sliding window with every {@code windowCount} of tuples
+ * and slides the window with {@code slideCount}.
+ *
+ * @param windowCount represents tuples count of a window
+ * @param slideCount represents sliding count window
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream slidingWindow(int windowCount, int slideCount, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(SlidingCountWindow.of(windowCount, slideCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
tumbles at duration of {@code windowDuration}
+ *
+ * @param windowDuration represents tumbling window duration 
configuration
+ * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, 
WindowsStoreFactory windowStoreFactory,
+ Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
+return window(TumblingDurationWindow.of(windowDuration), 
windowStoreFactory, inputFields, aggregator, functionFields);
+}
+
+/**
+ * Returns a stream of tuples which are aggregated results of a window 
which slides at duration of {@code slideDuration}
+ * and completes a window at {@code windowDuration}
+ *
+ * @param windowDuration represents window duration configuration
+ * @param slideDuration represents sliding duration  configuration
--- End diff --

Make it in sync with core api - "@param slidingInterval the time duration 
after which the window slides"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56769600
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+public class TridentHBaseWindowingStoreTopology {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+
+public static StormTopology buildTopology(WindowsStoreFactory 
windowsStore) throws Exception {
+FixedBatchSpout spout = new FixedBatchSpout(new 
Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+new Values("the man went to the store and bought some 
candy"), new Values("four score and seven years ago"),
+new Values("how many apples can you eat"), new Values("to 
be or not to be the person"));
+spout.setCycle(true);
+
+TridentTopology topology = new TridentTopology();
+
+Stream stream = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
+new Split(), new Fields("word"))
+.window(TumblingCountWindow.of(1000), windowsStore, new 
Fields("word"), new CountAsAggregator(), new Fields("count"))
+//.tumblingTimeWindow(new BaseWindowedBolt.Duration(3, 
TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), 
new Fields("count"))
+.each(new Fields("count"), new Debug())
+.each(new Fields("count"), new Echo(), new Fields("ct"));
+
+return topology.build();
+}
+
+public static class Split extends BaseFunction {
+@Override
+public void execute(TridentTuple tuple, TridentCollector 
collector) {
+String sentence = tuple.getString(0);
+for (String word : sentence.split(" ")) {
+collector.emit(new Values(word));
+}
+}
+}
+
+static class Echo implements Function {
+
+@Override
+public void execute(TridentTuple tuple, TridentCollector 
collector) {
+LOG.info("##Echo.execute: " + tuple);
+collector.emit(tuple.getValues());
+}
+
+@Override
+public void prepare(Map conf, TridentOperationContext context) {
+
+}
+
+@Override
+public void cleanup() {
+
+}
+}
+
+public static void main(String[] args) throws Exception {
+Config conf = new Config();
+ 

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56769622
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
 ---
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Sample application of trident windowing which uses inmemory store for 
storing tuples in window.
+ */
+public class TridentWindowingInmemoryStoreTopology {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
+
+public static StormTopology buildTopology(WindowsStoreFactory 
windowStore, WindowConfig windowConfig) throws Exception {
+FixedBatchSpout spout = new FixedBatchSpout(new 
Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+new Values("the man went to the store and bought some 
candy"), new Values("four score and seven years ago"),
+new Values("how many apples can you eat"), new Values("to 
be or not to be the person"));
+spout.setCycle(true);
+
+TridentTopology topology = new TridentTopology();
+
+Stream stream = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
+new Split(), new Fields("word"))
+.window(windowConfig, windowStore, new Fields("word"), new 
CountAsAggregator(), new Fields("count"))
+//.aggregate(new CountAsAggregator(), new Fields("count"))
+.each(new Fields("count"), new Debug())
+.each(new Fields("count"), new Echo(), new Fields("ct"))
+.each(new Fields("ct"), new Debug());
+
+return topology.build();
+}
+
+public static class Split extends BaseFunction {
--- End diff --

Why not reuse org.apache.storm.trident.testing.Split ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56769608
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+public class TridentHBaseWindowingStoreTopology {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+
+public static StormTopology buildTopology(WindowsStoreFactory 
windowsStore) throws Exception {
+FixedBatchSpout spout = new FixedBatchSpout(new 
Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+new Values("the man went to the store and bought some 
candy"), new Values("four score and seven years ago"),
+new Values("how many apples can you eat"), new Values("to 
be or not to be the person"));
+spout.setCycle(true);
+
+TridentTopology topology = new TridentTopology();
+
+Stream stream = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
+new Split(), new Fields("word"))
+.window(TumblingCountWindow.of(1000), windowsStore, new 
Fields("word"), new CountAsAggregator(), new Fields("count"))
+//.tumblingTimeWindow(new BaseWindowedBolt.Duration(3, 
TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), 
new Fields("count"))
+.each(new Fields("count"), new Debug())
+.each(new Fields("count"), new Echo(), new Fields("ct"));
+
+return topology.build();
+}
+
+public static class Split extends BaseFunction {
+@Override
+public void execute(TridentTuple tuple, TridentCollector 
collector) {
+String sentence = tuple.getString(0);
+for (String word : sentence.split(" ")) {
+collector.emit(new Values(word));
+}
+}
+}
+
+static class Echo implements Function {
+
+@Override
+public void execute(TridentTuple tuple, TridentCollector 
collector) {
+LOG.info("##Echo.execute: " + tuple);
+collector.emit(tuple.getValues());
+}
+
+@Override
+public void prepare(Map conf, TridentOperationContext context) {
+
+}
+
+@Override
+public void cleanup() {
+
+}
+}
+
+public static void main(String[] args) throws Exception {
+Config conf = new Config();
+ 

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56769598
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+public class TridentHBaseWindowingStoreTopology {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+
+public static StormTopology buildTopology(WindowsStoreFactory 
windowsStore) throws Exception {
+FixedBatchSpout spout = new FixedBatchSpout(new 
Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+new Values("the man went to the store and bought some 
candy"), new Values("four score and seven years ago"),
+new Values("how many apples can you eat"), new Values("to 
be or not to be the person"));
+spout.setCycle(true);
+
+TridentTopology topology = new TridentTopology();
+
+Stream stream = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
+new Split(), new Fields("word"))
+.window(TumblingCountWindow.of(1000), windowsStore, new 
Fields("word"), new CountAsAggregator(), new Fields("count"))
+//.tumblingTimeWindow(new BaseWindowedBolt.Duration(3, 
TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), 
new Fields("count"))
+.each(new Fields("count"), new Debug())
+.each(new Fields("count"), new Echo(), new Fields("ct"));
+
+return topology.build();
+}
+
+public static class Split extends BaseFunction {
--- End diff --

Why not reuse org.apache.storm.trident.testing.Split ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56769591
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+public class TridentHBaseWindowingStoreTopology {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+
+public static StormTopology buildTopology(WindowsStoreFactory 
windowsStore) throws Exception {
+FixedBatchSpout spout = new FixedBatchSpout(new 
Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+new Values("the man went to the store and bought some 
candy"), new Values("four score and seven years ago"),
+new Values("how many apples can you eat"), new Values("to 
be or not to be the person"));
+spout.setCycle(true);
+
+TridentTopology topology = new TridentTopology();
+
+Stream stream = topology.newStream("spout1", 
spout).parallelismHint(16).each(new Fields("sentence"),
+new Split(), new Fields("word"))
+.window(TumblingCountWindow.of(1000), windowsStore, new 
Fields("word"), new CountAsAggregator(), new Fields("count"))
+//.tumblingTimeWindow(new BaseWindowedBolt.Duration(3, 
TimeUnit.SECONDS), windowsStore, new Fields("word"), new CountAsAggregator(), 
new Fields("count"))
--- End diff --

Remove comment. If you want to illustrate time window its better to write a 
separate example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56769585
  
--- Diff: examples/storm-starter/pom.xml ---
@@ -177,6 +185,36 @@
   storm-redis
   ${project.version}
 
+  
+  org.apache.hbase
--- End diff --

why is hbase-server dependency needed ? anyways storm-hbase includes 
hbase-server dependency so this could be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56769580
  
--- Diff: examples/storm-starter/pom.xml ---
@@ -31,10 +31,13 @@
   storm-starter
 
   
- UTF-8
- 
- provided
+UTF-8
+
+provided
+0.98.4-hadoop2
+1.1.2
--- End diff --

hbase.version is defined twice


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-16 Thread arunmahadevan
Github user arunmahadevan commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-197175210
  
@harshach I haven't reviewed the code yet, plan to do that within a couple 
of days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-15 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-196884101
  
+1. will wait for @arunmahadevan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-15 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r56130662
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -565,19 +578,169 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents window tuples count
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingCountWindow(int windowCount, Fields inputFields, 
Aggregator aggregator, Fields functionFields) {
--- End diff --

@arunmahadevan Changed API aligned with core windowing API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-14 Thread satishd
Github user satishd commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-196317380
  
@arunmahadevan What is the batch emit interval in your topology 
configuration? Each triggered result is emitted as part of the nearest batch 
completed. Trident runs in batches and the emitted trigger would be part of the 
latest completed batch. These details are mentioned in the design doc attached 
in STORM-676. That is the reason it may have emitted results at the nearest 
completed batches. 
Can you please share your configuration so that I can confirm the behavior?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on the pull request:

https://github.com/apache/storm/pull/1072#issuecomment-196279990
  
Haven't gone through the code in great detail, but when I ran a simple 
topology with a tumbling time window of 3 secs,

- If the spout emits a batch and sleeps, the result of the window 
agregation is never emitted (ideally it should have emitted at 3 secs). 

- If the spout periodically emits batches (one batch every 5 sec), the time 
when the window output is generated always conincides with the time the spout 
emits the batches (i.e at 10, 15, 20 secs) whereas one would expect the result 
to be emitted at 6, 12, 15, 18 etc. Also only one result is emitted at time t = 
10 secs, whereas two batches are complete at this time.
 

Can you verify if this is the current behavior and if so fix it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-14 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55989920
  
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -565,19 +578,169 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
 .aggregate(inputFields, agg, functionFields)
 .chainEnd();
 }
-
+
+/**
+ * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
+ *
+ * @param windowCount represents window tuples count
+ * @param inputFields projected fields for aggregator
+ * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
+ * @param functionFields fields of values to emit with aggregation.
+ *
+ * @return
+ */
+public Stream tumblingCountWindow(int windowCount, Fields inputFields, 
Aggregator aggregator, Fields functionFields) {
--- End diff --

The various windowing configurations can be expressed with the 
`window(WindowConfig windowConfig,...)`. If we are adding wrappers over it, can 
you please maintain compatibility with the core apis ?

i.e,
```java
tumblingWindow(Count count)
tumblingWindow(Duration duration)
window(Count windowLength, Count slidingInterval)
window(Duration windowLength, Duration slidingInterval)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-13 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55941142
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
 ---
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+
+import java.util.Map;
+
+public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
+private final Map config;
+private final String tableName;
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStoreFactory(Map config, String 
tableName, byte[] family, byte[] qualifier) {
--- End diff --

@satishd isn't table name, cf are specific to Hbase. In that case a config 
method can take a map and do its validation and this validation will specific 
to HbaseWindowStoreFactory


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-13 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55941127
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 ---
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given 
configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+private static final Logger log = 
LoggerFactory.getLogger(HBaseWindowsStore.class);
+public static final String UTF_8 = "utf-8";
+
+private final ThreadLocal threadLocalHtable;
+private Queue htables = new ConcurrentLinkedQueue<>();
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStore(final Configuration config, final String 
tableName, byte[] family, byte[] qualifier) {
+this.family = family;
+this.qualifier = qualifier;
+
+threadLocalHtable = new ThreadLocal() {
+@Override
+protected HTable initialValue() {
+try {
+HTable hTable = new HTable(config, tableName);
+htables.add(hTable);
+return hTable;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+};
+
+}
+
+private HTable htable() {
+return threadLocalHtable.get();
+}
+
+private byte[] effectiveKey(String key) {
+try {
+return key.getBytes(UTF_8);
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public Object get(String key) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] effectiveKey = effectiveKey(key);
+Get get = new Get(effectiveKey);
+Result result = null;
+try {
+result = htable().get(get);
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+
+if(result.isEmpty()) {
+return null;
+}
+
+Kryo kryo = new Kryo();
+Input input = new Input(result.getValue(family, qualifier));
+Object resultObject = kryo.readClassAndObject(input);
+return resultObject;
+
+}
+
+@Override
+public Iterable get(List keys) {
+List gets = new ArrayList<>();
+for (String key : keys) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] 

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-13 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55934087
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
 ---
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+
+import java.util.Map;
+
+public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
+private final Map config;
+private final String tableName;
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStoreFactory(Map config, String 
tableName, byte[] family, byte[] qualifier) {
--- End diff --

@harshach All these fields should be available for windows-store-factory 
and they are not optional, that is why they are passed with the constructor. We 
may enclose them in a Configuration class(with immutable fields) and pass it to 
the constructor like the way it is done for HBaseStateFactory(Options). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-13 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55933892
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
 ---
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
--- End diff --

Updated with missing java doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-12 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55932588
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
 ---
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+/**
+ * InMemoryWindowsStoreFactory contains a single instance of {@code 
InMemoryWindowsStore} which will be used for
+ * storing tuples and triggers of the window and successfully emitted 
triggers can be removed from {@code StateUpdater}.
+ *
+ */
+public class InMemoryWindowsStoreFactory implements WindowsStoreFactory {
--- End diff --

@harshach Each  InMemoryWindowsStoreFactory instance will always have the 
same inmemoryStore instance. The same InMemoryWindowsStoreFactory instance is 
passed to WindowsStateUpdater which which removes successfully emitted triggers 
from the same inMemoryWindowsStore instance in 
WindowsStateUpdater#updateState(WindowsState, List, TridentCollector).

Below code snippet is from Stream#window(WindowConfig windowConfig, 
WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator 
aggregator,
  Fields functionFields, boolean storeTuplesInStore)
``` java
// when storeTuplesInStore is false then the given windowStoreFactory is 
only used to store triggers and
// that store is passed to WindowStateUpdater to remove them after 
committing the batch.
Stream stream = _topology.addSourcedNode(this,
new ProcessorNode(_topology.getUniqueStreamId(),
_name,
fields,
fields,
new WindowTridentProcessor(windowConfig, 
_topology.getUniqueWindowId(), windowStoreFactory,
inputFields, aggregator, 
storeTuplesInStore)));

Stream effectiveStream = stream.project(functionFields);

// create StateUpdater with the given windowStoreFactory to remove 
triggered aggregation results form store
// when they are successfully processed.
StateFactory stateFactory = new WindowsStateFactory();
StateUpdater stateUpdater = new 
WindowsStateUpdater(windowStoreFactory);
stream.partitionPersist(stateFactory, new 
Fields(WindowTridentProcessor.TRIGGER_FIELD_NAME), stateUpdater, new Fields());
```

Updated the java doc with the below details.

``` java
/**
 * InMemoryWindowsStoreFactory contains a single instance of {@link 
InMemoryWindowsStore} which will be used for
 * storing tuples and triggers of the window. The same 
InMemoryWindowsStoreFactory instance is passed to {@link WindowsStateUpdater},
 * which removes successfully emitted triggers from the same {@code 
inMemoryWindowsStore} instance in
 * {@link WindowsStateUpdater#updateState(WindowsState, List, 
TridentCollector)}.
 *
 */
public class InMemoryWindowsStoreFactory implements WindowsStoreFactory {

```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-12 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55932046
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
 ---
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerPolicy;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.apache.storm.windowing.WindowManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Basic functionality to manage trident tuple events using {@code 
WindowManager} and {@code WindowsStore} for storing
+ * tuples and triggers related information.
+ *
+ */
+public abstract class AbstractTridentWindowManager implements 
ITridentWindowManager {
+private static final Logger log = 
LoggerFactory.getLogger(AbstractTridentWindowManager.class);
+
+protected final WindowManager windowManager;
+protected final Aggregator aggregator;
+protected final BatchOutputCollector delegateCollector;
+protected final String windowTaskId;
+protected final WindowsStore windowStore;
+
+protected final Set activeBatches = new HashSet<>();
+protected final Queue pendingTriggers = new 
ConcurrentLinkedQueue<>();
+protected final AtomicInteger triggerId = new AtomicInteger();
+private final String windowTriggerCountId;
+private final TriggerPolicy triggerPolicy;
+
+public AbstractTridentWindowManager(WindowConfig windowConfig, String 
windowTaskId, WindowsStore windowStore,
+Aggregator aggregator, 
BatchOutputCollector delegateCollector) {
+this.windowTaskId = windowTaskId;
+this.windowStore = windowStore;
+this.aggregator = aggregator;
+this.delegateCollector = delegateCollector;
+
+windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX 
+ windowTaskId;
+
+windowManager = new WindowManager<>(new 
TridentWindowLifeCycleListener());
+
+WindowStrategy windowStrategy = 
WindowStrategyFactory.create(windowConfig);
+EvictionPolicy evictionPolicy = 
windowStrategy.getEvictionPolicy();
+windowManager.setEvictionPolicy(evictionPolicy);
+triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, 
evictionPolicy);
+windowManager.setTriggerPolicy(triggerPolicy);
+}
+
+@Override
+public void prepare() {
+preInitialize();
+
+initialize();
+
+postInitialize();
+}
+
+private void preInitialize() {
+log.debug("Getting current trigger count for this component/task");
+// get trigger count value from store
+Object result = windowStore.get(windowTriggerCountId);
+Integer currentCount = 0;
+if(result == null) {
+log.info("No current trigger count in windows store.");
+} else {
+currentCount = (Integer) result + 1;
+}
+

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-12 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55930607
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStore.java ---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Store for storing window related entities like windowed tuples, 
triggers etc.
+ *
+ */
+public interface WindowsStore extends Serializable {
+
+/**
+ * This can be used as a separator while generating a key from 
sequence of strings.
+ */
+public static final String KEY_SEPARATOR = "|";
--- End diff --

Its good to have configure method Map config rather than do it via 
constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-12 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55930541
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
 ---
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+/**
+ * InMemoryWindowsStoreFactory contains a single instance of {@code 
InMemoryWindowsStore} which will be used for
+ * storing tuples and triggers of the window and successfully emitted 
triggers can be removed from {@code StateUpdater}.
+ *
+ */
+public class InMemoryWindowsStoreFactory implements WindowsStoreFactory {
--- End diff --

is this intend to be singleton ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-12 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55930526
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
 ---
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.coordination.BatchOutputCollector;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
+import org.apache.storm.windowing.EvictionPolicy;
+import org.apache.storm.windowing.TriggerPolicy;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.apache.storm.windowing.WindowManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Basic functionality to manage trident tuple events using {@code 
WindowManager} and {@code WindowsStore} for storing
+ * tuples and triggers related information.
+ *
+ */
+public abstract class AbstractTridentWindowManager implements 
ITridentWindowManager {
+private static final Logger log = 
LoggerFactory.getLogger(AbstractTridentWindowManager.class);
+
+protected final WindowManager windowManager;
+protected final Aggregator aggregator;
+protected final BatchOutputCollector delegateCollector;
+protected final String windowTaskId;
+protected final WindowsStore windowStore;
+
+protected final Set activeBatches = new HashSet<>();
+protected final Queue pendingTriggers = new 
ConcurrentLinkedQueue<>();
+protected final AtomicInteger triggerId = new AtomicInteger();
+private final String windowTriggerCountId;
+private final TriggerPolicy triggerPolicy;
+
+public AbstractTridentWindowManager(WindowConfig windowConfig, String 
windowTaskId, WindowsStore windowStore,
+Aggregator aggregator, 
BatchOutputCollector delegateCollector) {
+this.windowTaskId = windowTaskId;
+this.windowStore = windowStore;
+this.aggregator = aggregator;
+this.delegateCollector = delegateCollector;
+
+windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX 
+ windowTaskId;
+
+windowManager = new WindowManager<>(new 
TridentWindowLifeCycleListener());
+
+WindowStrategy windowStrategy = 
WindowStrategyFactory.create(windowConfig);
+EvictionPolicy evictionPolicy = 
windowStrategy.getEvictionPolicy();
+windowManager.setEvictionPolicy(evictionPolicy);
+triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, 
evictionPolicy);
+windowManager.setTriggerPolicy(triggerPolicy);
+}
+
+@Override
+public void prepare() {
+preInitialize();
+
+initialize();
+
+postInitialize();
+}
+
+private void preInitialize() {
+log.debug("Getting current trigger count for this component/task");
+// get trigger count value from store
+Object result = windowStore.get(windowTriggerCountId);
+Integer currentCount = 0;
+if(result == null) {
+log.info("No current trigger count in windows store.");
+} else {
+currentCount = (Integer) result + 1;
+}
+

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-12 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55930477
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
 ---
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+
+import java.util.Map;
+
+public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
+private final Map config;
+private final String tableName;
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStoreFactory(Map config, String 
tableName, byte[] family, byte[] qualifier) {
--- End diff --

It would be better if we do this via config method than constructor


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-12 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55930467
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 ---
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given 
configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+private static final Logger log = 
LoggerFactory.getLogger(HBaseWindowsStore.class);
+public static final String UTF_8 = "utf-8";
+
+private final ThreadLocal threadLocalHtable;
+private Queue htables = new ConcurrentLinkedQueue<>();
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStore(final Configuration config, final String 
tableName, byte[] family, byte[] qualifier) {
+this.family = family;
+this.qualifier = qualifier;
+
+threadLocalHtable = new ThreadLocal() {
+@Override
+protected HTable initialValue() {
+try {
+HTable hTable = new HTable(config, tableName);
+htables.add(hTable);
+return hTable;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+};
+
+}
+
+private HTable htable() {
+return threadLocalHtable.get();
+}
+
+private byte[] effectiveKey(String key) {
+try {
+return key.getBytes(UTF_8);
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public Object get(String key) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] effectiveKey = effectiveKey(key);
+Get get = new Get(effectiveKey);
+Result result = null;
+try {
+result = htable().get(get);
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+
+if(result.isEmpty()) {
+return null;
+}
+
+Kryo kryo = new Kryo();
+Input input = new Input(result.getValue(family, qualifier));
+Object resultObject = kryo.readClassAndObject(input);
+return resultObject;
+
+}
+
+@Override
+public Iterable get(List keys) {
+List gets = new ArrayList<>();
+for (String key : keys) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] 

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-12 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55930433
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 ---
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given 
configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+private static final Logger log = 
LoggerFactory.getLogger(HBaseWindowsStore.class);
+public static final String UTF_8 = "utf-8";
+
+private final ThreadLocal threadLocalHtable;
+private Queue htables = new ConcurrentLinkedQueue<>();
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStore(final Configuration config, final String 
tableName, byte[] family, byte[] qualifier) {
--- End diff --

this looks like its better have its own config method and take a Map 
config object to configure it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-12 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55930422
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
 ---
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
--- End diff --

nit; please remove empty comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-12 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55930081
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
 ---
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class TridentWindowingInmemoryStoreTopology {
+private static final Logger log = 
LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
--- End diff --

in storm we are always using capital LOG. can you please make sure this 
reflected across patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---