[GitHub] storm issue #2285: Change OracleJDK7 to OpenJDK7 in Travis CI build
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2285 +1, good find --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2280: STORM-2500: remove call to waitUntilReady
Github user ZacharyThomas commented on the issue: https://github.com/apache/storm/pull/2280 @HeartSaVioR It was fun. Encountered this while trying to debug the exclamation topology dying on a new cluster (see: https://mail-archives.apache.org/mod_mbox/storm-user/201708.mbox/%3CCA%2BAmaO0oP5imSgs7nXboM_VQva0PzoQNGGURG%2BfR3rySystm1g%40mail.gmail.com%3E) Which, if you have any suggestions for, that'd be great. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2280: STORM-2500: remove call to waitUntilReady
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2280 @ZacharyThomas Just updated. Thanks again for contributing! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2280: STORM-2500: remove call to waitUntilReady
Github user ZacharyThomas commented on the issue: https://github.com/apache/storm/pull/2280 @HeartSaVioR Thanks for the merge. My username on jira should be actking. Sorry I couldn't find anywhere to set assignee on Jira. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2286: fix not poll bug when use manual commit
GitHub user pczb opened a pull request: https://github.com/apache/storm/pull/2286 fix not poll bug when use manual commit You can merge this pull request into a Git repository by running: $ git pull https://github.com/pczb/storm master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2286.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2286 commit a5d333954cade20fe2ee3345e1c27c29528d72e6 Author: pczbDate: 2017-08-19T04:01:46Z fix not poll bug when use manual commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2280: STORM-2500: remove call to waitUntilReady
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2280 @ZacharyThomas Thanks for the patch. I merged this but can't update assignee on issue (https://issues.apache.org/jira/browse/STORM-2500). Could you let me know your Apache JIRA ID? Thanks in advance! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2280: STORM-2500: remove call to waitUntilReady
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2280 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2279: Nitpicky cosmetic change - use "Uptime" instead of "UpTim...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2279 Thanks @vitaliyf I merged into master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2279: Nitpicky cosmetic change - use "Uptime" instead of...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2279 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2278: Quick fix: remove duplicated IStormClusterState object in...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2278 Thanks @Ethanlm, I merged into master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2278: Quick fix: remove duplicated IStormClusterState ob...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2278 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2276: Fix typos in Worker.java
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2276 Thanks @ruili-kekeke merged into master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2276: Fix typos in Worker.java
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2276 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2275: STORM-2692: Load only configs specific to the topo...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2275 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2267: STORM-2682: Fix issues with null owner on rolling ...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2267 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2266: STORM-2682: Fixed issues with null owner
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2266 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2285: Change OracleJDK7 to OpenJDK7 in Travis CI build
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/storm/pull/2285 Change OracleJDK7 to OpenJDK7 in Travis CI build We've running issue on build failures on all PRs against 1.x version, due to using Oracle JDK 7 as JDK, which Oracle withdraws it. Same issue is filed to Travis CI, and you can see the comment here: https://github.com/travis-ci/travis-ci/issues/7884#issuecomment-308451879 Changed OracleJDK7 to OpenJDK7 and checked the build result against my fork: https://travis-ci.org/HeartSaVioR/storm/builds/266178126 This should be ported back to all 1.x version line branches. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/storm fix-travis-ci-oracle-7-not-supported Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2285.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2285 commit 28e93828c5e4f6ce3d5e5d515e96fa9502d50d6b Author: Jungtaek LimDate: 2017-08-19T01:28:41Z Travis CI can't support OracleJDK7 as Oracle's withdrawl * https://github.com/travis-ci/travis-ci/issues/7884#issuecomment-308451879 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2284: [STORM-2695] Fix in docs: BlobStore uncompress argument s...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2284 +1 Nice finding. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2268: STORM-2689: Simplify dependency configuration for storm-k...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2268 @srdo +1 to remove DRPC demo in other examples rather than storm-starter. Please go ahead. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2278: Quick fix: remove duplicated IStormClusterState object in...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2278 +1 Nice finding. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2280: STORM-2500: remove call to waitUntilReady
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2280 +1 This fixes what #2142 already fixed, but no activity is seen on #2142 I'll just merge this. Travis CI fails on all JDK7 builds, hence not related to this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2279: Nitpicky cosmetic change - use "Uptime" instead of "UpTim...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2279 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2283: Quick fix confusing exception message in ObjectReader.jav...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2283 +1 Nice finding. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2283: Quick fix confusing exception message in ObjectReader.jav...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2283 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2282#discussion_r133973275 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java --- @@ -0,0 +1,48 @@ +package org.apache.storm.kafka.spout; + +import java.io.Serializable; +import java.util.List; + + +/** + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout. + */ +public interface KafkaTupleListener extends Serializable { --- End diff -- It might be useful to have a prepare method on this interface too, in case implementations need to instantiate something that can't be serialized, e.g. a KafkaProducer. I'm also wondering if it would be useful to be able to emit dead tuples to a different stream as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2282#discussion_r133974197 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -73,7 +73,9 @@ // Strategy to determine the fetch offset of the first realized by the spout upon activation private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Class that has the logic to handle tuple failure -private transient KafkaSpoutRetryService retryService; +private transient KafkaSpoutRetryService retryService; +// Handles the events off a tuple --- End diff -- Nit: Rephrase as "Handles tuple events (emit, ack etc.)", I don't think it is clear what this means. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2282#discussion_r133971254 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java --- @@ -0,0 +1,48 @@ +package org.apache.storm.kafka.spout; + +import java.io.Serializable; +import java.util.List; + + +/** + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout. + */ +public interface KafkaTupleListener extends Serializable { + +/** + * Called when the tuple is emitted. + * + * @param tuple the storm tuple. + * @param msgId The id of the tuple in the spout. + */ +void onEmit(List tuple, KafkaSpoutMessageId msgId); + + +/** + * Called when a tuple is acked. + * + * @param msgId The id of the tuple in the spout. + */ +void onAck(KafkaSpoutMessageId msgId); + +/** + * Called when a fail reaches the spout, but the Kafka record does not belong to the spout anymore. --- End diff -- Nit: The kafka record belongs to a partition that is not assigned to the spout anymore. I'm wondering what this hook would be useful for? The record will get replayed by a different task if using at-least-once, otherwise it'll be dropped. I think if the reason it is here is to allow implementations to clean up after emit, we should just have a general onPartitionsReassigned(assignedPartitions) instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2282: STORM-2694: Add KafkaTupleListener to storm-kafka-...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2282#discussion_r133973452 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java --- @@ -0,0 +1,35 @@ +package org.apache.storm.kafka.spout; + +import java.util.List; + +public class EmptyKafkaTupleListener implements KafkaTupleListener { +@Override +public void onEmit(List tuple, KafkaSpoutMessageId msgId) { +// empty method --- End diff -- This comment isn't really necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2284: [STORM-2695] Fix in docs: BlobStore uncompress arg...
GitHub user Ethanlm opened a pull request: https://github.com/apache/storm/pull/2284 [STORM-2695] Fix in docs: BlobStore uncompress argument should be Boolean See: https://issues.apache.org/jira/browse/STORM-2695 We need to update the doc for "uncompress" argument from String (e.g. "false") to Boolean (e.g. false) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Ethanlm/storm STORM-2695 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2284.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2284 commit aede02eb34c5c8078a3d3bdbab38b4dac65f2411 Author: Ethan LiDate: 2017-08-18T14:21:45Z [STORM-2695] Fix in docs: BlobStore uncompress argument should be Boolean --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2283: Quick fix confusing exception message in ObjectRea...
GitHub user Ethanlm opened a pull request: https://github.com/apache/storm/pull/2283 Quick fix confusing exception message in ObjectReader.java The exception message like `Don't know how to convert false + to boolean` can be confusing. I deleted `+` You can merge this pull request into a Git repository by running: $ git pull https://github.com/Ethanlm/storm TYPO-IN-ObjectReader-CLASS Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2283.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2283 commit 8ed69fdeafc4cce8c1b11a42be0de6de309a1f8e Author: Ethan LiDate: 2017-08-18T14:04:56Z Quick fix confusing exception message in ObjectReader.java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2233#discussion_r133946525 --- Diff: docs/Stream-API.md --- @@ -276,6 +276,21 @@ PairStreamscores = ... // list of scores per user in the last window, e.g. ("alice", [10, 11, 13]), ("bob", [15, 20]) PairStream userScores = scores.window(...).groupByKey(); ``` + +### coGroupByKey + +`coGroupByKey` Groups the values of this stream with the values having the same key from the other stream. + +```java +// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3) +PairStream stream1 = ... + +// another stream of (key, value) pairs e.g. (k1, x1), (k1, x2), (k3, x3) +PairStream stream2 = ... --- End diff -- can be `PairStream stream1` to match with above comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2233#discussion_r133947663 --- Diff: storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java --- @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.streams.processors; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Multimap; +import org.apache.storm.streams.Pair; + +/** + * co-group by key implementation + */ +public class CoGroupByKeyProcessorextends BaseProcessor > implements BatchProcessor { +private final String firstStream; +private final String secondStream; +private final Multimap firstMap = ArrayListMultimap.create(); +private final Multimap secondMap = ArrayListMultimap.create(); + + +public CoGroupByKeyProcessor(String firstStream, String secondStream) { +this.firstStream = firstStream; +this.secondStream = secondStream; +} + +@Override +public void execute(Pair input, String sourceStream) { +K key = input.getFirst(); +if (sourceStream.equals(firstStream)) { +V1 val = (V1) input.getSecond(); +firstMap.put(key, val); +} else if (sourceStream.equals(secondStream)) { +V2 val = (V2) input.getSecond(); +secondMap.put(key, val); +} +if (!context.isWindowed()) { +forwardValues(); +} + +} + +@Override +public void finish() { +forwardValues(); +firstMap.clear(); +secondMap.clear(); +} + +private void forwardValues() { +Multimap immutableFirstMap = ImmutableMultimap.copyOf(firstMap); +immutableFirstMap.asMap().forEach((key, values) -> { +context.forward(Pair.of(key, Pair.of(values, secondMap.removeAll(key; +}); + +Multimap immutableSecondMap = ImmutableMultimap.copyOf(secondMap); --- End diff -- Why is a copy required here? Can't you operate directly out of the firstMap and secondMap? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2233#discussion_r133947567 --- Diff: storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java --- @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.streams.processors; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Multimap; +import org.apache.storm.streams.Pair; + +/** + * co-group by key implementation + */ +public class CoGroupByKeyProcessorextends BaseProcessor > implements BatchProcessor { +private final String firstStream; +private final String secondStream; +private final Multimap firstMap = ArrayListMultimap.create(); +private final Multimap secondMap = ArrayListMultimap.create(); + + +public CoGroupByKeyProcessor(String firstStream, String secondStream) { +this.firstStream = firstStream; +this.secondStream = secondStream; +} + +@Override +public void execute(Pair input, String sourceStream) { +K key = input.getFirst(); +if (sourceStream.equals(firstStream)) { +V1 val = (V1) input.getSecond(); +firstMap.put(key, val); +} else if (sourceStream.equals(secondStream)) { +V2 val = (V2) input.getSecond(); +secondMap.put(key, val); +} +if (!context.isWindowed()) { +forwardValues(); +} + +} + +@Override +public void finish() { +forwardValues(); +firstMap.clear(); +secondMap.clear(); +} + +private void forwardValues() { +Multimap immutableFirstMap = ImmutableMultimap.copyOf(firstMap); --- End diff -- Why is a copy required here? Can't you operate directly out of the firstMap and secondMap? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2233#discussion_r133946658 --- Diff: docs/Stream-API.md --- @@ -276,6 +276,21 @@ PairStreamscores = ... // list of scores per user in the last window, e.g. ("alice", [10, 11, 13]), ("bob", [15, 20]) PairStream userScores = scores.window(...).groupByKey(); ``` + +### coGroupByKey + +`coGroupByKey` Groups the values of this stream with the values having the same key from the other stream. + +```java +// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3) +PairStream stream1 = ... + +// another stream of (key, value) pairs e.g. (k1, x1), (k1, x2), (k3, x3) +PairStream stream2 = ... + +// the co-grouped values per key in the last window, e.g. (k1, ([v1], [x1, x2]), (k2, ([v2, v3], [])), (k3, ([], [x3])) +PairStream userScores = stream1.window(...).coGroupByKey(stream2); --- End diff -- can be `PairStream coGroupedStream =` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2233#discussion_r133946510 --- Diff: docs/Stream-API.md --- @@ -276,6 +276,21 @@ PairStreamscores = ... // list of scores per user in the last window, e.g. ("alice", [10, 11, 13]), ("bob", [15, 20]) PairStream userScores = scores.window(...).groupByKey(); ``` + +### coGroupByKey + +`coGroupByKey` Groups the values of this stream with the values having the same key from the other stream. + +```java +// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3) +PairStream stream1 = ... --- End diff -- can be `PairStream stream1` to match with above comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2233#discussion_r133947083 --- Diff: storm-client/src/jvm/org/apache/storm/streams/PairStream.java --- @@ -380,6 +382,26 @@ return partitionBy(KEY).updateStateByKeyPartition(stateUpdater); } +/** + * Groups the values of this stream with the values having the same key from the other stream. + * If stream1 has values - (k1, v1), (k2, v2), (k2, v3) + * and stream2 has values - (k1, x1), (k1, x2), (k3, x3) * --- End diff -- please format so that its easy to read in javadocs and remove the extra `*` at the end of this line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2233#discussion_r133948483 --- Diff: storm-client/test/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessorTest.java --- @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.streams.processors; + +import org.apache.curator.shaded.com.google.common.collect.ImmutableBiMap; +import org.apache.curator.shaded.com.google.common.collect.ImmutableMultimap; +import org.apache.storm.streams.Pair; +import org.apache.storm.streams.operations.PairValueJoiner; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.Collection; +import java.util.Arrays; + + +import static org.junit.Assert.assertEquals; + +public class CoGroupByKeyProcessorTest { +private CoGroupByKeyProcessorcoGroupByKeyProcessor; +private String firstStream = "first"; +private String secondStream = "second"; +private List >> res = new ArrayList<>(); + +private ProcessorContext context = new ProcessorContext() { +@Override +public void forward(T input) { +res.add((Pair >)input); +} + +@Override +public void forward(T input, String stream) { +} + +@Override +public boolean isWindowed() { +return true; +} + +@Override +public Set getWindowedParentStreams() { +return null; +} +}; + +private List > firstKeyValeus = Arrays.asList( +Pair.of(2, 4), +Pair.of(5, 25), +Pair.of(7, 49), +Pair.of(7, 87) +); + +private List > secondKeyValues = Arrays.asList( +Pair.of(1, 1), +Pair.of(2, 8), +Pair.of(5, 125), +Pair.of(5,50), +Pair.of(6, 216) + +); + +@Test +public void testCoGroupByKey() throws Exception { +coGroupByKeyProcessor = new CoGroupByKeyProcessor<>(firstStream, secondStream); +processValues(); +List >> result = new ArrayList<>(); --- End diff -- result -> expected --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---