[GitHub] storm issue #2285: Change OracleJDK7 to OpenJDK7 in Travis CI build

2017-08-18 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2285
  
+1, good find


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2280: STORM-2500: remove call to waitUntilReady

2017-08-18 Thread ZacharyThomas
Github user ZacharyThomas commented on the issue:

https://github.com/apache/storm/pull/2280
  
@HeartSaVioR It was fun. Encountered this while trying to debug the 
exclamation topology dying on a new cluster (see: 
https://mail-archives.apache.org/mod_mbox/storm-user/201708.mbox/%3CCA%2BAmaO0oP5imSgs7nXboM_VQva0PzoQNGGURG%2BfR3rySystm1g%40mail.gmail.com%3E)

Which, if you have any suggestions for, that'd be great. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2280: STORM-2500: remove call to waitUntilReady

2017-08-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2280
  
@ZacharyThomas Just updated. Thanks again for contributing!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2280: STORM-2500: remove call to waitUntilReady

2017-08-18 Thread ZacharyThomas
Github user ZacharyThomas commented on the issue:

https://github.com/apache/storm/pull/2280
  
@HeartSaVioR Thanks for the merge. My username on jira should be actking. 
Sorry I couldn't find anywhere to set assignee on Jira. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2286: fix not poll bug when use manual commit

2017-08-18 Thread pczb
GitHub user pczb opened a pull request:

https://github.com/apache/storm/pull/2286

fix not poll bug when use manual commit



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pczb/storm master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2286.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2286


commit a5d333954cade20fe2ee3345e1c27c29528d72e6
Author: pczb 
Date:   2017-08-19T04:01:46Z

fix not poll bug when use manual commit




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2280: STORM-2500: remove call to waitUntilReady

2017-08-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2280
  
@ZacharyThomas 
Thanks for the patch. I merged this but can't update assignee on issue 
(https://issues.apache.org/jira/browse/STORM-2500). 

Could you let me know your Apache JIRA ID? Thanks in advance!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2280: STORM-2500: remove call to waitUntilReady

2017-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2280


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2279: Nitpicky cosmetic change - use "Uptime" instead of "UpTim...

2017-08-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2279
  
Thanks @vitaliyf I merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2279: Nitpicky cosmetic change - use "Uptime" instead of...

2017-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2279


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2278: Quick fix: remove duplicated IStormClusterState object in...

2017-08-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2278
  
Thanks @Ethanlm, I merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2278: Quick fix: remove duplicated IStormClusterState ob...

2017-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2278


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2276: Fix typos in Worker.java

2017-08-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2276
  
Thanks @ruili-kekeke merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2276: Fix typos in Worker.java

2017-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2276


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2275: STORM-2692: Load only configs specific to the topo...

2017-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2275


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2267: STORM-2682: Fix issues with null owner on rolling ...

2017-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2267


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2266: STORM-2682: Fixed issues with null owner

2017-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2266


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2285: Change OracleJDK7 to OpenJDK7 in Travis CI build

2017-08-18 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/storm/pull/2285

Change OracleJDK7 to OpenJDK7 in Travis CI build

We've running issue on build failures on all PRs against 1.x version, due 
to using Oracle JDK 7 as JDK, which Oracle withdraws it.
Same issue is filed to Travis CI, and you can see the comment here: 
https://github.com/travis-ci/travis-ci/issues/7884#issuecomment-308451879

Changed OracleJDK7 to OpenJDK7 and checked the build result against my 
fork: https://travis-ci.org/HeartSaVioR/storm/builds/266178126

This should be ported back to all 1.x version line branches.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/storm 
fix-travis-ci-oracle-7-not-supported

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2285.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2285


commit 28e93828c5e4f6ce3d5e5d515e96fa9502d50d6b
Author: Jungtaek Lim 
Date:   2017-08-19T01:28:41Z

Travis CI can't support OracleJDK7 as Oracle's withdrawl

* https://github.com/travis-ci/travis-ci/issues/7884#issuecomment-308451879




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2284: [STORM-2695] Fix in docs: BlobStore uncompress argument s...

2017-08-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2284
  
+1 Nice finding.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2268: STORM-2689: Simplify dependency configuration for storm-k...

2017-08-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2268
  
@srdo +1 to remove DRPC demo in other examples rather than storm-starter. 
Please go ahead. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2278: Quick fix: remove duplicated IStormClusterState object in...

2017-08-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2278
  
+1 Nice finding.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2280: STORM-2500: remove call to waitUntilReady

2017-08-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2280
  
+1
This fixes what #2142 already fixed, but no activity is seen on #2142 I'll 
just merge this.
Travis CI fails on all JDK7 builds, hence not related to this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2279: Nitpicky cosmetic change - use "Uptime" instead of "UpTim...

2017-08-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2279
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2283: Quick fix confusing exception message in ObjectReader.jav...

2017-08-18 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2283
  
+1 Nice finding.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2283: Quick fix confusing exception message in ObjectReader.jav...

2017-08-18 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2283
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

2017-08-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2282#discussion_r133973275
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
 ---
@@ -0,0 +1,48 @@
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * The KafkaTupleListener handles state changes of a kafka tuple inside a 
KafkaSpout.
+ */
+public interface KafkaTupleListener extends Serializable {
--- End diff --

It might be useful to have a prepare method on this interface too, in case 
implementations need to instantiate something that can't be serialized, e.g. a 
KafkaProducer. I'm also wondering if it would be useful to be able to emit dead 
tuples to a different stream as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

2017-08-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2282#discussion_r133974197
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -73,7 +73,9 @@
 // Strategy to determine the fetch offset of the first realized by the 
spout upon activation
 private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  
 // Class that has the logic to handle tuple failure
-private transient KafkaSpoutRetryService retryService;  
+private transient KafkaSpoutRetryService retryService;
+// Handles the events off a tuple
--- End diff --

Nit: Rephrase as "Handles tuple events (emit, ack etc.)", I don't think it 
is clear what this means.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

2017-08-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2282#discussion_r133971254
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
 ---
@@ -0,0 +1,48 @@
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * The KafkaTupleListener handles state changes of a kafka tuple inside a 
KafkaSpout.
+ */
+public interface KafkaTupleListener extends Serializable {
+
+/**
+ * Called when the tuple is emitted.
+ *
+ * @param tuple the storm tuple.
+ * @param msgId The id of the tuple in the spout.
+ */
+void onEmit(List tuple, KafkaSpoutMessageId msgId);
+
+
+/**
+ * Called when a tuple is acked.
+ *
+ * @param msgId The id of the tuple in the spout.
+ */
+void onAck(KafkaSpoutMessageId msgId);
+
+/**
+ * Called when a fail reaches the spout, but the Kafka record does not 
belong to the spout anymore.
--- End diff --

Nit: The kafka record belongs to a partition that is not assigned to the 
spout anymore.
I'm wondering what this hook would be useful for? The record will get 
replayed by a different task if using at-least-once, otherwise it'll be 
dropped. I think if the reason it is here is to allow implementations to clean 
up after emit, we should just have a general 
onPartitionsReassigned(assignedPartitions) instead. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...

2017-08-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2282#discussion_r133973452
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
 ---
@@ -0,0 +1,35 @@
+package org.apache.storm.kafka.spout;
+
+import java.util.List;
+
+public class EmptyKafkaTupleListener implements KafkaTupleListener {
+@Override
+public void onEmit(List tuple, KafkaSpoutMessageId msgId) {
+// empty method
--- End diff --

This comment isn't really necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2284: [STORM-2695] Fix in docs: BlobStore uncompress arg...

2017-08-18 Thread Ethanlm
GitHub user Ethanlm opened a pull request:

https://github.com/apache/storm/pull/2284

[STORM-2695] Fix in docs: BlobStore uncompress argument should be Boolean

See: https://issues.apache.org/jira/browse/STORM-2695

We need to update the doc for "uncompress" argument from String (e.g. 
"false") to Boolean (e.g. false)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ethanlm/storm STORM-2695

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2284.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2284


commit aede02eb34c5c8078a3d3bdbab38b4dac65f2411
Author: Ethan Li 
Date:   2017-08-18T14:21:45Z

[STORM-2695] Fix in docs: BlobStore uncompress argument should be Boolean




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2283: Quick fix confusing exception message in ObjectRea...

2017-08-18 Thread Ethanlm
GitHub user Ethanlm opened a pull request:

https://github.com/apache/storm/pull/2283

Quick fix confusing exception message in ObjectReader.java

The exception message like `Don't know how to convert  false + to boolean` 
can be confusing. I deleted `+`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ethanlm/storm TYPO-IN-ObjectReader-CLASS

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2283.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2283


commit 8ed69fdeafc4cce8c1b11a42be0de6de309a1f8e
Author: Ethan Li 
Date:   2017-08-18T14:04:56Z

Quick fix confusing exception message in ObjectReader.java




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133946525
  
--- Diff: docs/Stream-API.md ---
@@ -276,6 +276,21 @@ PairStream scores = ...
 // list of scores per user in the last window, e.g. ("alice", [10, 11, 
13]), ("bob", [15, 20])
 PairStream userScores =  
scores.window(...).groupByKey(); 
 ```
+
+###  coGroupByKey
+
+`coGroupByKey` Groups the values of this stream with the values having the 
same key from the other stream.
+
+```java
+// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3)
+PairStream stream1 = ...
+
+// another stream of (key, value) pairs e.g. (k1, x1), (k1, x2), (k3, x3)
+PairStream stream2 = ...
--- End diff --

can be `PairStream stream1` to match with above comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133947663
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends BaseProcessor> implements BatchProcessor {
+private final String firstStream;
+private final String secondStream;
+private final Multimap firstMap = ArrayListMultimap.create();
+private final Multimap secondMap = ArrayListMultimap.create();
+
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream) {
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+firstMap.put(key, val);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+secondMap.put(key, val);
+}
+if (!context.isWindowed()) {
+forwardValues();
+}
+
+}
+
+@Override
+public void finish() {
+forwardValues();
+firstMap.clear();
+secondMap.clear();
+}
+
+private void forwardValues() {
+Multimap immutableFirstMap = 
ImmutableMultimap.copyOf(firstMap);
+immutableFirstMap.asMap().forEach((key, values) -> {
+context.forward(Pair.of(key, Pair.of(values, 
secondMap.removeAll(key;
+});
+
+Multimap immutableSecondMap = 
ImmutableMultimap.copyOf(secondMap);
--- End diff --

Why is a copy required here? Can't you operate directly out of the firstMap 
and secondMap?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133947567
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends BaseProcessor> implements BatchProcessor {
+private final String firstStream;
+private final String secondStream;
+private final Multimap firstMap = ArrayListMultimap.create();
+private final Multimap secondMap = ArrayListMultimap.create();
+
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream) {
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+firstMap.put(key, val);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+secondMap.put(key, val);
+}
+if (!context.isWindowed()) {
+forwardValues();
+}
+
+}
+
+@Override
+public void finish() {
+forwardValues();
+firstMap.clear();
+secondMap.clear();
+}
+
+private void forwardValues() {
+Multimap immutableFirstMap = 
ImmutableMultimap.copyOf(firstMap);
--- End diff --

Why is a copy required here? Can't you operate directly out of the firstMap 
and secondMap?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133946658
  
--- Diff: docs/Stream-API.md ---
@@ -276,6 +276,21 @@ PairStream scores = ...
 // list of scores per user in the last window, e.g. ("alice", [10, 11, 
13]), ("bob", [15, 20])
 PairStream userScores =  
scores.window(...).groupByKey(); 
 ```
+
+###  coGroupByKey
+
+`coGroupByKey` Groups the values of this stream with the values having the 
same key from the other stream.
+
+```java
+// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3)
+PairStream stream1 = ...
+
+// another stream of (key, value) pairs e.g. (k1, x1), (k1, x2), (k3, x3)
+PairStream stream2 = ...
+
+// the co-grouped values per key in the last window, e.g. (k1, ([v1], [x1, 
x2]), (k2, ([v2, v3], [])), (k3, ([], [x3]))
+PairStream userScores =  
stream1.window(...).coGroupByKey(stream2);
--- End diff --

can be `PairStream coGroupedStream =`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133946510
  
--- Diff: docs/Stream-API.md ---
@@ -276,6 +276,21 @@ PairStream scores = ...
 // list of scores per user in the last window, e.g. ("alice", [10, 11, 
13]), ("bob", [15, 20])
 PairStream userScores =  
scores.window(...).groupByKey(); 
 ```
+
+###  coGroupByKey
+
+`coGroupByKey` Groups the values of this stream with the values having the 
same key from the other stream.
+
+```java
+// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3)
+PairStream stream1 = ...
--- End diff --

can be `PairStream stream1` to match with above comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133947083
  
--- Diff: storm-client/src/jvm/org/apache/storm/streams/PairStream.java ---
@@ -380,6 +382,26 @@
 return partitionBy(KEY).updateStateByKeyPartition(stateUpdater);
 }
 
+/**
+ * Groups the values of this stream with the values having the same 
key from the other stream.
+ * If stream1 has values - (k1, v1), (k2, v2), (k2, v3)
+ * and stream2 has values - (k1, x1), (k1, x2), (k3, x3) *
--- End diff --

please format so that its easy to read in javadocs and remove the extra `*` 
at the end of this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133948483
  
--- Diff: 
storm-client/test/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessorTest.java
 ---
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.processors;
+
+import org.apache.curator.shaded.com.google.common.collect.ImmutableBiMap;
+import 
org.apache.curator.shaded.com.google.common.collect.ImmutableMultimap;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.operations.PairValueJoiner;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.Collection;
+import java.util.Arrays;
+
+
+import static org.junit.Assert.assertEquals;
+
+public class CoGroupByKeyProcessorTest {
+private CoGroupByKeyProcessor 
coGroupByKeyProcessor;
+private String firstStream = "first";
+private String secondStream = "second";
+private List>> res = 
new ArrayList<>();
+
+private ProcessorContext context = new ProcessorContext() {
+@Override
+public  void forward(T input) {
+res.add((Pair>)input);
+}
+
+@Override
+public  void forward(T input, String stream) {
+}
+
+@Override
+public boolean isWindowed() {
+return true;
+}
+
+@Override
+public Set getWindowedParentStreams() {
+return null;
+}
+};
+
+private List> firstKeyValeus = Arrays.asList(
+Pair.of(2, 4),
+Pair.of(5, 25),
+Pair.of(7, 49),
+Pair.of(7, 87)
+);
+
+private List> secondKeyValues = Arrays.asList(
+Pair.of(1, 1),
+Pair.of(2, 8),
+Pair.of(5, 125),
+Pair.of(5,50),
+Pair.of(6, 216)
+
+);
+
+@Test
+public void testCoGroupByKey() throws Exception {
+coGroupByKeyProcessor = new CoGroupByKeyProcessor<>(firstStream, 
secondStream);
+processValues();
+List>> result = new ArrayList<>();
--- End diff --

result -> expected


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---