[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-25 Thread kamleshbhatt
Github user kamleshbhatt closed the pull request at:

https://github.com/apache/storm/pull/2233


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r134441630
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends BaseProcessor> implements BatchProcessor {
+private final String firstStream;
+private final String secondStream;
+private final Multimap firstMap = ArrayListMultimap.create();
+private final Multimap secondMap = ArrayListMultimap.create();
+
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream) {
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+firstMap.put(key, val);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+secondMap.put(key, val);
+}
+if (!context.isWindowed()) {
+forwardValues();
+}
+
+}
+
+@Override
+public void finish() {
+forwardValues();
+firstMap.clear();
+secondMap.clear();
+}
+
+private void forwardValues() {
+Multimap immutableFirstMap = 
ImmutableMultimap.copyOf(firstMap);
--- End diff --

You can just take a copy of the values than copying the entire map.

`context.forward(Pair.of(key, Pair.of(new ArrayList<>(values), 
secondMap.removeAll(key;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r134441647
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends BaseProcessor> implements BatchProcessor {
+private final String firstStream;
+private final String secondStream;
+private final Multimap firstMap = ArrayListMultimap.create();
+private final Multimap secondMap = ArrayListMultimap.create();
+
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream) {
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+firstMap.put(key, val);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+secondMap.put(key, val);
+}
+if (!context.isWindowed()) {
+forwardValues();
+}
+
+}
+
+@Override
+public void finish() {
+forwardValues();
+firstMap.clear();
+secondMap.clear();
+}
+
+private void forwardValues() {
+Multimap immutableFirstMap = 
ImmutableMultimap.copyOf(firstMap);
+immutableFirstMap.asMap().forEach((key, values) -> {
+context.forward(Pair.of(key, Pair.of(values, 
secondMap.removeAll(key;
+});
+
+Multimap immutableSecondMap = 
ImmutableMultimap.copyOf(secondMap);
--- End diff --

You can just take a copy of the values than copying the entire map.

`context.forward(Pair.of(key, Pair.of(firstMap.removeAll(key), new 
ArrayList<>(values;`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-20 Thread kamleshbhatt
Github user kamleshbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r134122087
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends BaseProcessor> implements BatchProcessor {
+private final String firstStream;
+private final String secondStream;
+private final Multimap firstMap = ArrayListMultimap.create();
+private final Multimap secondMap = ArrayListMultimap.create();
+
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream) {
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+firstMap.put(key, val);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+secondMap.put(key, val);
+}
+if (!context.isWindowed()) {
+forwardValues();
+}
+
+}
+
+@Override
+public void finish() {
+forwardValues();
+firstMap.clear();
+secondMap.clear();
+}
+
+private void forwardValues() {
+Multimap immutableFirstMap = 
ImmutableMultimap.copyOf(firstMap);
+immutableFirstMap.asMap().forEach((key, values) -> {
+context.forward(Pair.of(key, Pair.of(values, 
secondMap.removeAll(key;
+});
+
+Multimap immutableSecondMap = 
ImmutableMultimap.copyOf(secondMap);
--- End diff --

If the original Map is used,  firstMap.clear() and secondMap.clear() in 
method finish() would clear the content of the output.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-20 Thread kamleshbhatt
Github user kamleshbhatt commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r134122065
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends BaseProcessor> implements BatchProcessor {
+private final String firstStream;
+private final String secondStream;
+private final Multimap firstMap = ArrayListMultimap.create();
+private final Multimap secondMap = ArrayListMultimap.create();
+
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream) {
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+firstMap.put(key, val);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+secondMap.put(key, val);
+}
+if (!context.isWindowed()) {
+forwardValues();
+}
+
+}
+
+@Override
+public void finish() {
+forwardValues();
+firstMap.clear();
+secondMap.clear();
+}
+
+private void forwardValues() {
+Multimap immutableFirstMap = 
ImmutableMultimap.copyOf(firstMap);
--- End diff --

If I don't copy, firstMap.clear() and secondMap.clear()  in method finish() 
would clear the  content of the output.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133946525
  
--- Diff: docs/Stream-API.md ---
@@ -276,6 +276,21 @@ PairStream scores = ...
 // list of scores per user in the last window, e.g. ("alice", [10, 11, 
13]), ("bob", [15, 20])
 PairStream userScores =  
scores.window(...).groupByKey(); 
 ```
+
+###  coGroupByKey
+
+`coGroupByKey` Groups the values of this stream with the values having the 
same key from the other stream.
+
+```java
+// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3)
+PairStream stream1 = ...
+
+// another stream of (key, value) pairs e.g. (k1, x1), (k1, x2), (k3, x3)
+PairStream stream2 = ...
--- End diff --

can be `PairStream stream1` to match with above comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133947663
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends BaseProcessor> implements BatchProcessor {
+private final String firstStream;
+private final String secondStream;
+private final Multimap firstMap = ArrayListMultimap.create();
+private final Multimap secondMap = ArrayListMultimap.create();
+
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream) {
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+firstMap.put(key, val);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+secondMap.put(key, val);
+}
+if (!context.isWindowed()) {
+forwardValues();
+}
+
+}
+
+@Override
+public void finish() {
+forwardValues();
+firstMap.clear();
+secondMap.clear();
+}
+
+private void forwardValues() {
+Multimap immutableFirstMap = 
ImmutableMultimap.copyOf(firstMap);
+immutableFirstMap.asMap().forEach((key, values) -> {
+context.forward(Pair.of(key, Pair.of(values, 
secondMap.removeAll(key;
+});
+
+Multimap immutableSecondMap = 
ImmutableMultimap.copyOf(secondMap);
--- End diff --

Why is a copy required here? Can't you operate directly out of the firstMap 
and secondMap?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133947567
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends BaseProcessor> implements BatchProcessor {
+private final String firstStream;
+private final String secondStream;
+private final Multimap firstMap = ArrayListMultimap.create();
+private final Multimap secondMap = ArrayListMultimap.create();
+
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream) {
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+firstMap.put(key, val);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+secondMap.put(key, val);
+}
+if (!context.isWindowed()) {
+forwardValues();
+}
+
+}
+
+@Override
+public void finish() {
+forwardValues();
+firstMap.clear();
+secondMap.clear();
+}
+
+private void forwardValues() {
+Multimap immutableFirstMap = 
ImmutableMultimap.copyOf(firstMap);
--- End diff --

Why is a copy required here? Can't you operate directly out of the firstMap 
and secondMap?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133946658
  
--- Diff: docs/Stream-API.md ---
@@ -276,6 +276,21 @@ PairStream scores = ...
 // list of scores per user in the last window, e.g. ("alice", [10, 11, 
13]), ("bob", [15, 20])
 PairStream userScores =  
scores.window(...).groupByKey(); 
 ```
+
+###  coGroupByKey
+
+`coGroupByKey` Groups the values of this stream with the values having the 
same key from the other stream.
+
+```java
+// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3)
+PairStream stream1 = ...
+
+// another stream of (key, value) pairs e.g. (k1, x1), (k1, x2), (k3, x3)
+PairStream stream2 = ...
+
+// the co-grouped values per key in the last window, e.g. (k1, ([v1], [x1, 
x2]), (k2, ([v2, v3], [])), (k3, ([], [x3]))
+PairStream userScores =  
stream1.window(...).coGroupByKey(stream2);
--- End diff --

can be `PairStream coGroupedStream =`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133946510
  
--- Diff: docs/Stream-API.md ---
@@ -276,6 +276,21 @@ PairStream scores = ...
 // list of scores per user in the last window, e.g. ("alice", [10, 11, 
13]), ("bob", [15, 20])
 PairStream userScores =  
scores.window(...).groupByKey(); 
 ```
+
+###  coGroupByKey
+
+`coGroupByKey` Groups the values of this stream with the values having the 
same key from the other stream.
+
+```java
+// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3)
+PairStream stream1 = ...
--- End diff --

can be `PairStream stream1` to match with above comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133947083
  
--- Diff: storm-client/src/jvm/org/apache/storm/streams/PairStream.java ---
@@ -380,6 +382,26 @@
 return partitionBy(KEY).updateStateByKeyPartition(stateUpdater);
 }
 
+/**
+ * Groups the values of this stream with the values having the same 
key from the other stream.
+ * If stream1 has values - (k1, v1), (k2, v2), (k2, v3)
+ * and stream2 has values - (k1, x1), (k1, x2), (k3, x3) *
--- End diff --

please format so that its easy to read in javadocs and remove the extra `*` 
at the end of this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-18 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r133948483
  
--- Diff: 
storm-client/test/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessorTest.java
 ---
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.processors;
+
+import org.apache.curator.shaded.com.google.common.collect.ImmutableBiMap;
+import 
org.apache.curator.shaded.com.google.common.collect.ImmutableMultimap;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.operations.PairValueJoiner;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.Collection;
+import java.util.Arrays;
+
+
+import static org.junit.Assert.assertEquals;
+
+public class CoGroupByKeyProcessorTest {
+private CoGroupByKeyProcessor 
coGroupByKeyProcessor;
+private String firstStream = "first";
+private String secondStream = "second";
+private List>> res = 
new ArrayList<>();
+
+private ProcessorContext context = new ProcessorContext() {
+@Override
+public  void forward(T input) {
+res.add((Pair>)input);
+}
+
+@Override
+public  void forward(T input, String stream) {
+}
+
+@Override
+public boolean isWindowed() {
+return true;
+}
+
+@Override
+public Set getWindowedParentStreams() {
+return null;
+}
+};
+
+private List> firstKeyValeus = Arrays.asList(
+Pair.of(2, 4),
+Pair.of(5, 25),
+Pair.of(7, 49),
+Pair.of(7, 87)
+);
+
+private List> secondKeyValues = Arrays.asList(
+Pair.of(1, 1),
+Pair.of(2, 8),
+Pair.of(5, 125),
+Pair.of(5,50),
+Pair.of(6, 216)
+
+);
+
+@Test
+public void testCoGroupByKey() throws Exception {
+coGroupByKeyProcessor = new CoGroupByKeyProcessor<>(firstStream, 
secondStream);
+processValues();
+List>> result = new ArrayList<>();
--- End diff --

result -> expected


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-07 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r131659700
  
--- Diff: storm-client/src/jvm/org/apache/storm/streams/PairStream.java ---
@@ -380,6 +383,22 @@
 return partitionBy(KEY).updateStateByKeyPartition(stateUpdater);
 }
 
+/**
+ * group the values of this stream with the values having the same key 
from the other stream.
--- End diff --

group -> Groups


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-07 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r131680183
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends BaseProcessor> implements BatchProcessor {
+private final String firstStream;
+private final String secondStream;
+private final Multimap firstMap = ArrayListMultimap.create();
+private final Multimap secondMap = ArrayListMultimap.create();
+
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream) {
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+firstMap.put(key, val);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+secondMap.put(key, val);
+}
+if (!context.isWindowed()) {
+forwardValues();
+}
+
+}
+
+@Override
+public void finish() {
+forwardValues();
+firstMap.clear();
+secondMap.clear();
+}
+
+private void forwardValues() {
+firstMap.asMap().forEach((key, values) -> {
+context.forward(Pair.of(key, Pair.of(values, 
secondMap.removeAll(key;
+});
+
+secondMap.asMap().forEach((key, values) -> {
+context.forward(Pair.of(key, Pair.of(firstMap.removeAll(key), 
values)));
--- End diff --

`firstMap.get(key)` doesn't work ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-07 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r131682043
  
--- Diff: 
storm-client/test/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessorTest.java
 ---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.processors;
+
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.operations.PairValueJoiner;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.Collection;
+import java.util.Arrays;
+
+
+import static org.junit.Assert.assertEquals;
+
+public class CoGroupByKeyProcessorTest {
+private CoGroupByKeyProcessor 
coGroupByKeyProcessor;
+private String firstStream = "first";
+private String secondStream = "second";
+private List>> res = 
new ArrayList<>();
+
+private ProcessorContext context = new ProcessorContext() {
+@Override
+public  void forward(T input) {
+res.add((Pair>)input);
+}
+
+@Override
+public  void forward(T input, String stream) {
+}
+
+@Override
+public boolean isWindowed() {
+return true;
+}
+
+@Override
+public Set getWindowedParentStreams() {
+return null;
+}
+};
+
+private List> firstKeyValeus = Arrays.asList(
+Pair.of(2, 4),
+Pair.of(5, 25),
+Pair.of(7, 49),
+Pair.of(7, 87)
+);
+
+private List> secondKeyValues = Arrays.asList(
+Pair.of(1, 1),
+Pair.of(2, 8),
+Pair.of(5, 125),
+Pair.of(5,50),
+Pair.of(6, 216)
+
+);
+
+@Test
+public void testCoGroupByKey() throws Exception {
+coGroupByKeyProcessor = new CoGroupByKeyProcessor<>(firstStream, 
secondStream);
+processValues();
+List>> result = new ArrayList<>();
+Collection list1 = new ArrayList<>();
+list1.add(25);
+Collection list2 = new ArrayList<>();
+list2.add(125);
+list2.add(50);
+result.add(Pair.of(5, Pair.of(list1, list2)));
+list1.clear();
+list2.clear();
+list1.add(49);
+list1.add(87);
+result.add(Pair.of(7, Pair.of(list1, list2)));
+}
--- End diff --

I cant find any assertions here. Are you checking if the result is as 
expected?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-07 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r131690416
  
--- Diff: storm-client/src/jvm/org/apache/storm/streams/PairStream.java ---
@@ -380,6 +383,22 @@
 return partitionBy(KEY).updateStateByKeyPartition(stateUpdater);
 }
 
+/**
+ * group the values of this stream with the values having the same key 
from the other stream.
--- End diff --

May be here and in the Streams API doc add a snippet to make it clear:

```
If stream1 has values - (k1, v1), (k2, v2), (k2, v3) 
and stream2 has values - (k1, x1), (k1, x2), (k3, x3)

The the co-grouped stream would contain -
(k1, ([v1], [x1, x2]), (k2, ([v2, v3], [])), (k3, ([], [x3]))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-07-31 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r130441519
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.operations.PairValueJoiner;
+import org.apache.storm.streams.tuple.Tuple3;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends 
BaseProcessor> implements BatchProcessor {
+private final PairValueJoiner 
valueJoiner;
+private final String firstStream;
+private final String secondStream;
+private final List> firstRows = new ArrayList<>();
+private final List> secondRows = new ArrayList<>();
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream, 
PairValueJoiner valueJoiner) {
+this.valueJoiner = valueJoiner;
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+Pair pair = Pair.of(key, val);
+firstRows.add(pair);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+Pair pair = Pair.of(key, val);
+secondRows.add(pair);
+}
+if (!context.isWindowed()) {
+joinAndForward(firstRows, secondRows);
+}
+
+}
+
+@Override
+public void finish() {
+joinAndForward(firstRows, secondRows);
+firstRows.clear();
+secondRows.clear();
+}
+
+private void joinAndForward(List> firstRows, List> secondRows) {
+for (Tuple3 res : 
join(getJoinTable(firstRows), getJoinTable(secondRows))) {
+context.forward(Pair.of(res._1, valueJoiner.apply(res._2, 
res._3)));
+
+}
+}
+
+/*
+ * returns list of Tuple3 (key, val from table, val from row)
+ */
+
+private  List> 
join(Multimap tab1, Multimap tab2) {
+List> res = new 
ArrayList<>();
+for (K key : tab1.keys()) {
--- End diff --

`keys` returns a multi-set, I am not sure how this would work. You will end 
up with duplicate results for the same key.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-07-31 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r130441506
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.operations.PairValueJoiner;
+import org.apache.storm.streams.tuple.Tuple3;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends 
BaseProcessor> implements BatchProcessor {
+private final PairValueJoiner 
valueJoiner;
+private final String firstStream;
+private final String secondStream;
+private final List> firstRows = new ArrayList<>();
+private final List> secondRows = new ArrayList<>();
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream, 
PairValueJoiner valueJoiner) {
+this.valueJoiner = valueJoiner;
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+Pair pair = Pair.of(key, val);
+firstRows.add(pair);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+Pair pair = Pair.of(key, val);
+secondRows.add(pair);
+}
+if (!context.isWindowed()) {
+joinAndForward(firstRows, secondRows);
+}
+
+}
+
+@Override
+public void finish() {
+joinAndForward(firstRows, secondRows);
+firstRows.clear();
+secondRows.clear();
+}
+
+private void joinAndForward(List> firstRows, List> secondRows) {
+for (Tuple3 res : 
join(getJoinTable(firstRows), getJoinTable(secondRows))) {
--- End diff --

Once you maintain the two multi-maps, you don't need the join step. You 
could iterate over them directly and forward the entries.
```java
mm1.asMap().forEach((key, values) -> {
context.forward(Pair.of(key, Pair.of(values, 
mm2.removeAll(key;
});

mm2.asMap().forEach((key, values) -> {
 context.forward(Pair.of(key, Pair.of(mm1.get(key), values)));
});
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-07-31 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r130434940
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.operations.PairValueJoiner;
+import org.apache.storm.streams.tuple.Tuple3;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends 
BaseProcessor> implements BatchProcessor {
+private final PairValueJoiner 
valueJoiner;
--- End diff --

`Iterable`, `Iterable` instead of `Collection`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-07-31 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r130434631
  
--- Diff: storm-client/src/jvm/org/apache/storm/streams/PairStream.java ---
@@ -400,6 +422,18 @@
 return new PairStream<>(streamBuilder, joinNode);
 }
 
+private  PairStream coGroupByKeyPartition(PairStream otherStream,
+   
PairValueJoiner valueJoiner) {
+String firstStream = stream;
+String secondStream = otherStream.stream;
+Node joinNode = addProcessorNode(
--- End diff --

nit: coGroupNode


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-07-31 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r130435926
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.operations.PairValueJoiner;
+import org.apache.storm.streams.tuple.Tuple3;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends 
BaseProcessor> implements BatchProcessor {
+private final PairValueJoiner 
valueJoiner;
+private final String firstStream;
+private final String secondStream;
+private final List> firstRows = new ArrayList<>();
--- End diff --

Why not just maintain two multi-maps and add directly? This list looks 
redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-07-31 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r130434218
  
--- Diff: storm-client/src/jvm/org/apache/storm/streams/PairStream.java ---
@@ -380,6 +383,25 @@
 return partitionBy(KEY).updateStateByKeyPartition(stateUpdater);
 }
 
+/**
+ * group the values of this stream with the values having the same key 
from the other stream.
+ * 
+ * Note: The parallelism of this stream is carried forward to the 
joined stream.
+ * 
+ *
+ * @param otherStream the other stream
+ * @param valueJoiner the {@link PairValueJoiner}
+ * @param  the type of the values resulting from the 
grouping
+ * @param the type of the values in the other stream
+ * @return the new stream
+ */
+public  PairStream coGroupByKey(PairStream 
otherStream,
+ 
PairValueJoiner valueJoiner) {
--- End diff --

In the case of Co-group it does not make much sense to expose the 
PairValueJoiner in the public API. If needed this can be an implementation 
detail.

So the API could be something like,
```java
public  PairStream> 
coGroupByKey(PairStream otherStream)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-07-31 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r130429097
  
--- Diff: storm-client/src/jvm/org/apache/storm/streams/PairStream.java ---
@@ -380,6 +383,25 @@
 return partitionBy(KEY).updateStateByKeyPartition(stateUpdater);
 }
 
+/**
+ * group the values of this stream with the values having the same key 
from the other stream.
+ * 
+ * Note: The parallelism of this stream is carried forward to the 
joined stream.
--- End diff --

carried forward to the co-grouped stream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-07-31 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r130436004
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.operations.PairValueJoiner;
+import org.apache.storm.streams.tuple.Tuple3;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends 
BaseProcessor> implements BatchProcessor {
+private final PairValueJoiner 
valueJoiner;
+private final String firstStream;
+private final String secondStream;
+private final List> firstRows = new ArrayList<>();
+private final List> secondRows = new ArrayList<>();
--- End diff --

Why not just maintain two multi-maps and add directly? This list looks 
redundant.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-07-20 Thread kamleshbhatt
GitHub user kamleshbhatt opened a pull request:

https://github.com/apache/storm/pull/2233

Storm 2258: Streams api - support CoGroupByKey



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kamleshbhatt/storm STORM-2258

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2233.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2233






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---