[2/2] beam git commit: This closes #3845
This closes #3845 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2cef54ea Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2cef54ea Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2cef54ea Branch: refs/heads/mr-runner Commit: 2cef54ea2562c679d68ff6faed5598fc74b9811a Parents: b6f22aa 71b5e7c Author: Pei He <p...@apache.org> Authored: Wed Nov 8 14:29:21 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Wed Nov 8 14:29:21 2017 +0800 -- runners/map-reduce/pom.xml | 2 +- .../mapreduce/translation/BeamInputFormat.java | 22 +++--- .../mapreduce/translation/JobPrototype.java | 4 +- .../mapreduce/translation/ParDoOperation.java | 7 +- .../translation/SerializedPipelineOptions.java | 76 5 files changed, 20 insertions(+), 91 deletions(-) --
[1/2] beam git commit: mr-runner: use SerializablePipelineOptions to serde PipelineOptions
Repository: beam Updated Branches: refs/heads/mr-runner b6f22aa76 -> 2cef54ea2 mr-runner: use SerializablePipelineOptions to serde PipelineOptions Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71b5e7c4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71b5e7c4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71b5e7c4 Branch: refs/heads/mr-runner Commit: 71b5e7c45d1501030717cbfd608bfae36641de79 Parents: b6f22aa Author: huafengw <fvunic...@gmail.com> Authored: Wed Sep 13 10:24:24 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Wed Nov 8 14:28:41 2017 +0800 -- runners/map-reduce/pom.xml | 2 +- .../mapreduce/translation/BeamInputFormat.java | 22 +++--- .../mapreduce/translation/JobPrototype.java | 4 +- .../mapreduce/translation/ParDoOperation.java | 7 +- .../translation/SerializedPipelineOptions.java | 76 5 files changed, 20 insertions(+), 91 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/pom.xml -- diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml index 000f20c..7f2e851 100644 --- a/runners/map-reduce/pom.xml +++ b/runners/map-reduce/pom.xml @@ -93,7 +93,7 @@ - + org.apache.hadoop hadoop-mapreduce-client-core http://git-wip-us.apache.org/repos/asf/beam/blob/71b5e7c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java -- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java index 3d0b8ea..8a55c5e 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java @@ -31,6 +31,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; + +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource; @@ -57,7 +59,7 @@ public class BeamInputFormat extends InputFormat { private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 * 1000; private List sources; - private SerializedPipelineOptions options; + private SerializablePipelineOptions options; public BeamInputFormat() { } @@ -73,8 +75,8 @@ public class BeamInputFormat extends InputFormat { } sources = (List) SerializableUtils.deserializeFromByteArray( Base64.decodeBase64(serializedBoundedSource), "TaggedSources"); -options = ((SerializedPipelineOptions) SerializableUtils.deserializeFromByteArray( -Base64.decodeBase64(serializedPipelineOptions), "SerializedPipelineOptions")); +options = ((SerializablePipelineOptions) SerializableUtils.deserializeFromByteArray( +Base64.decodeBase64(serializedPipelineOptions), "SerializablePipelineOptions")); try { @@ -86,7 +88,7 @@ public class BeamInputFormat extends InputFormat { final ReadOperation.TaggedSource taggedSource) { try { return FluentIterable.from(taggedSource.getSource().split( -DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.getPipelineOptions())) +DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options.get())) .transform(new Function<BoundedSource, ReadOperation.TaggedSource>() { @Override public ReadOperation.TaggedSource apply(BoundedSource input) { @@ -120,7 +122,7 @@ public class BeamInputFormat extends InputFormat { private static class BeamInputSplit extends InputSplit implements Writable { private String stepName; private BoundedSource boundedSource; -private SerializedPipelineOptions options; +private SerializablePipelineOptions options; private TupleTag tupleTag; public BeamInputSplit() { @@ -129,7 +131,7 @@ public class BeamInputFormat extends InputFormat { public BeamInputSplit( String stepName, BoundedSource boundedSource, -SerializedPipelineOptions options, +SerializablePipelineOptions options, TupleTag tupleTag) { this.stepName = checkNotNull(stepName, "stepName"); thi
[2/2] beam git commit: This closes #3846
This closes #3846 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd9c548b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd9c548b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd9c548b Branch: refs/heads/jstorm-runner Commit: cd9c548b6e1ee57f3a202da64e5e55bc6704b67f Parents: ef70031 03cc311 Author: Pei He <p...@apache.org> Authored: Thu Sep 14 19:29:31 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Sep 14 19:29:31 2017 +0800 -- .../beam/runners/jstorm/translation/TranslatorRegistry.java| 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) --
[1/2] beam git commit: JStorm-runner: Remove unnecessary WARN log, which might case confusion.
Repository: beam Updated Branches: refs/heads/jstorm-runner ef70031b7 -> cd9c548b6 JStorm-runner: Remove unnecessary WARN log, which might case confusion. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/03cc311c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/03cc311c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/03cc311c Branch: refs/heads/jstorm-runner Commit: 03cc311cfbabd92390d7a848a135b59d9d80530c Parents: ef70031 Author: basti.ljAuthored: Wed Sep 13 16:17:14 2017 +0800 Committer: basti.lj Committed: Wed Sep 13 16:17:58 2017 +0800 -- .../beam/runners/jstorm/translation/TranslatorRegistry.java| 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/03cc311c/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java index c8ea545..f297dc3 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java @@ -48,10 +48,6 @@ class TranslatorRegistry { } public static TransformTranslator getTranslator(PTransform transform) { -TransformTranslator translator = TRANSLATORS.get(transform.getClass()); -if (translator == null) { - LOG.warn("Unsupported operator={}", transform.getClass().getName()); -} -return translator; +return TRANSLATORS.get(transform.getClass()); } }
[2/2] beam git commit: This closes #3841
This closes #3841 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa4ecea2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa4ecea2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa4ecea2 Branch: refs/heads/master Commit: fa4ecea26713244a83521ce1ca2864f4ad4409c8 Parents: 50532f0 31f51d2 Author: Pei He <p...@apache.org> Authored: Thu Sep 14 17:21:09 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Sep 14 17:21:09 2017 +0800 -- .../translation/functions/FlinkAssignContext.java | 17 - 1 file changed, 8 insertions(+), 9 deletions(-) --
[1/2] beam git commit: flink-runner: constructs exception string only when neccessary, it reduces per-element expensive calls(String.format and getSimpleName) in FlinkAssignContext.
Repository: beam Updated Branches: refs/heads/master 50532f0a9 -> fa4ecea26 flink-runner: constructs exception string only when neccessary, it reduces per-element expensive calls(String.format and getSimpleName) in FlinkAssignContext. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31f51d28 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31f51d28 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31f51d28 Branch: refs/heads/master Commit: 31f51d28c574ea1792312a528b25793230787486 Parents: 50532f0 Author: Pei He <p...@apache.org> Authored: Tue Sep 12 17:26:28 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Sep 14 17:20:54 2017 +0800 -- .../translation/functions/FlinkAssignContext.java | 17 - 1 file changed, 8 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/31f51d28/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java index 447b1e5..26d6721 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.flink.translation.functions; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.common.collect.Iterables; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -35,13 +33,14 @@ class FlinkAssignContext<InputT, W extends BoundedWindow> FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue value) { fn.super(); -checkArgument( -Iterables.size(value.getWindows()) == 1, -String.format( -"%s passed to window assignment must be in a single window, but it was in %s: %s", -WindowedValue.class.getSimpleName(), -Iterables.size(value.getWindows()), -value.getWindows())); +if (Iterables.size(value.getWindows()) != 1) { + throw new IllegalArgumentException( + String.format( + "%s passed to window assignment must be in a single window, but it was in %s: %s", + WindowedValue.class.getSimpleName(), + Iterables.size(value.getWindows()), + value.getWindows())); +} this.value = value; }
[jira] [Created] (BEAM-2942) DOCUMENTATION in header becomes too long, and needs redesign
Pei He created BEAM-2942: Summary: DOCUMENTATION in header becomes too long, and needs redesign Key: BEAM-2942 URL: https://issues.apache.org/jira/browse/BEAM-2942 Project: Beam Issue Type: Bug Components: website Reporter: Pei He Assignee: Reuven Lax As we are adding more runners, the documentation in header becomes too long. In my 15 inch mac, I cannot see the last item -- SQL under DSL. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[2/4] beam git commit: jstorm-runner: support distribution metrics, this is required by nexmark.
jstorm-runner: support distribution metrics, this is required by nexmark. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c388447 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c388447 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c388447 Branch: refs/heads/jstorm-runner Commit: 0c388447813a93ab9afc45af591417a82f8c4b1b Parents: 56ad7a8 Author: Pei He <p...@apache.org> Authored: Wed Sep 6 11:05:05 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Fri Sep 8 14:42:28 2017 +0800 -- .../beam/runners/jstorm/TestJStormRunner.java | 1 + .../jstorm/translation/JStormMetricResults.java | 38 +++-- .../jstorm/translation/MetricsReporter.java | 56 3 files changed, 92 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/0c388447/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index b28c127..e9f6337 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -61,6 +61,7 @@ public class TestJStormRunner extends PipelineRunner { private TestJStormRunner(JStormPipelineOptions options) { this.options = options; Map conf = Maps.newHashMap(); +conf.put("topology.metric.sample.rate", 1); // Default state backend is RocksDB, for the users who could not run RocksDB on local testing // env, following config is used to configure state backend to memory. // conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString()); http://git-wip-us.apache.org/repos/asf/beam/blob/0c388447/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java index 986bf0c..01d4441 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java @@ -22,7 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.alibaba.jstorm.common.metric.AsmCounter; import com.alibaba.jstorm.common.metric.AsmGauge; import com.alibaba.jstorm.common.metric.AsmHistogram; +import com.alibaba.jstorm.common.metric.snapshot.AsmHistogramSnapshot; import com.alibaba.jstorm.metric.AsmWindow; +import com.alibaba.jstorm.metrics.Snapshot; import com.google.auto.value.AutoValue; import java.util.ArrayList; import java.util.List; @@ -87,7 +89,35 @@ public class JStormMetricResults extends MetricResults { new Instant(0; } -return JStormMetricQueryResults.create(counters, gauges); +List<MetricResult> distributions = new ArrayList<>(); +for (Map.Entry<String, AsmHistogram> entry : histogramMap.entrySet()) { + MetricKey metricKey = MetricsReporter.toMetricKey(entry.getKey()); + if (!MetricFiltering.matches(filter, metricKey)) { +continue; + } + AsmHistogram histogram = entry.getValue(); + histogram.forceFlush(); + + Snapshot snapshot = + ((AsmHistogramSnapshot) histogram.getSnapshots().get(AsmWindow.M10_WINDOW)).getSnapshot(); + // TODO: Sum and count might be under estimated, because JStorm histogram only store a fixed + // number of values. + long sum = 0; + for (long v : snapshot.getValues()) { +sum += v; + } + distributions.add( + JStormMetricResult.create( + metricKey.metricName(), + metricKey.stepName(), + DistributionResult.create( + sum, + snapshot.size(), + snapshot.getMin(), + snapshot.getMax(; +} + +return JStormMetricQueryResults.create(counters, gauges, distributions); } @AutoValue @@ -97,8 +127,10 @@ public class JStormMetricResults extends MetricResults { public static MetricQueryResults create( Iterable<MetricResult> counters, -Iterable<MetricResult> gauges) { - return new AutoValue_JStormMetricResults_JStormMetricQueryResults(counters, gauges, null); +
[1/4] beam git commit: jstorm-runner: saves MetricResults in local JStormRunnerResult after cancelling.
Repository: beam Updated Branches: refs/heads/jstorm-runner d2b285122 -> ef70031b7 jstorm-runner: saves MetricResults in local JStormRunnerResult after cancelling. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56ad7a85 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56ad7a85 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56ad7a85 Branch: refs/heads/jstorm-runner Commit: 56ad7a852acf66ebd1061e276317481629b270c4 Parents: d2b2851 Author: Pei He <p...@apache.org> Authored: Wed Sep 6 11:05:51 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Fri Sep 8 14:42:27 2017 +0800 -- .../beam/runners/jstorm/JStormRunnerResult.java | 38 +++- .../beam/runners/jstorm/TestJStormRunner.java | 18 -- .../jstorm/translation/JStormMetricResults.java | 25 + 3 files changed, 56 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/56ad7a85/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java index 3962ca2..8848717 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -23,10 +23,13 @@ import static com.google.common.base.Preconditions.checkNotNull; import backtype.storm.Config; import backtype.storm.LocalCluster; import com.alibaba.jstorm.common.metric.AsmGauge; +import com.alibaba.jstorm.common.metric.AsmMetric; import com.alibaba.jstorm.metric.AsmMetricRegistry; import com.alibaba.jstorm.metric.JStormMetrics; +import com.alibaba.jstorm.task.error.TaskReportErrorAndDie; import com.alibaba.jstorm.utils.JStormUtils; import java.io.IOException; +import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.jstorm.translation.CommonInstance; import org.apache.beam.runners.jstorm.translation.JStormMetricResults; @@ -62,6 +65,7 @@ public abstract class JStormRunnerResult implements PipelineResult { this.topologyName = checkNotNull(topologyName, "topologyName"); } + @Override public State getState() { return null; } @@ -79,6 +83,7 @@ public abstract class JStormRunnerResult implements PipelineResult { private final LocalCluster localCluster; private final long localModeExecuteTimeSecs; private boolean cancelled; +private MetricResults savedMetricResults; LocalJStormPipelineResult( String topologyName, @@ -89,12 +94,27 @@ public abstract class JStormRunnerResult implements PipelineResult { this.localCluster = checkNotNull(localCluster, "localCluster"); this.localModeExecuteTimeSecs = localModeExecuteTimeSecs; this.cancelled = false; + this.savedMetricResults = null; +} + +@Override +public State getState() { + if (cancelled) { +return State.CANCELLED; + } else if (globalWatermark() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { +return State.DONE; + } else { +return State.RUNNING; + } } @Override public State cancel() throws IOException { + savedMetricResults = metrics(); localCluster.killTopology(getTopologyName()); localCluster.shutdown(); + clearPAssertCount(); + TaskReportErrorAndDie.setExceptionRecord(null); JStormUtils.sleepMs(1000); cancelled = true; return State.CANCELLED; @@ -129,7 +149,12 @@ public abstract class JStormRunnerResult implements PipelineResult { @Override public MetricResults metrics() { - return new JStormMetricResults(); + if (savedMetricResults != null) { +return savedMetricResults; + } + AsmMetricRegistry metricRegistry = JStormMetrics.getTaskMetrics(); + return new JStormMetricResults( + metricRegistry.getCounters(), metricRegistry.getGauges(), metricRegistry.getHistograms()); } private long globalWatermark() { @@ -151,5 +176,16 @@ public abstract class JStormRunnerResult implements PipelineResult { return BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); } } + +private void clearPAssertCount() { + AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics(); + Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator(); + while (itr.hasNext()) { +Map.Entry<String, AsmMetric> metric = itr.next(); +if (metric.getKe
[3/4] beam git commit: jstorm-runner: support SourceMetrics.
jstorm-runner: support SourceMetrics. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fec423e5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fec423e5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fec423e5 Branch: refs/heads/jstorm-runner Commit: fec423e51148d26495fb8e6d17ac204b161f3069 Parents: 0c38844 Author: Pei He <p...@apache.org> Authored: Wed Sep 6 12:37:21 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Fri Sep 8 14:42:28 2017 +0800 -- runners/jstorm/pom.xml | 1 - .../translation/UnboundedSourceSpout.java | 30 ++-- 2 files changed, 22 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/fec423e5/runners/jstorm/pom.xml -- diff --git a/runners/jstorm/pom.xml b/runners/jstorm/pom.xml index 681adb5..a433fcb 100644 --- a/runners/jstorm/pom.xml +++ b/runners/jstorm/pom.xml @@ -90,7 +90,6 @@ org.apache.beam.sdk.testing.UsesSetState, org.apache.beam.sdk.testing.UsesSplittableParDo, -org.apache.beam.sdk.testing.UsesAttemptedMetrics, org.apache.beam.sdk.testing.UsesCommittedMetrics, org.apache.beam.sdk.testing.UsesTestStream http://git-wip-us.apache.org/repos/asf/beam/blob/fec423e5/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java index 73f1f0d..92d2f24 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java @@ -26,12 +26,14 @@ import backtype.storm.tuple.Values; import com.alibaba.jstorm.metric.MetricClient; import com.alibaba.jstorm.metrics.Gauge; import com.alibaba.jstorm.utils.KryoSerializer; +import java.io.Closeable; import java.io.IOException; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.jstorm.JStormPipelineOptions; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -49,7 +51,7 @@ import org.slf4j.LoggerFactory; public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); - private final String name; + private final String stepName; private final String description; private final UnboundedSource source; private final SerializedPipelineOptions serializedOptions; @@ -58,6 +60,7 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou private transient JStormPipelineOptions pipelineOptions; private transient UnboundedSource.UnboundedReader reader; private transient SpoutOutputCollector collector; + private transient MetricsReporter metricsReporter; private volatile boolean hasNextRecord; private AtomicBoolean activated = new AtomicBoolean(); @@ -67,12 +70,12 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou private long lastWaterMark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); public UnboundedSourceSpout( - String name, + String stepName, String description, UnboundedSource source, JStormPipelineOptions options, TupleTag outputTag) { -this.name = name; +this.stepName = checkNotNull(stepName, "stepName"); this.description = checkNotNull(description, "description"); this.source = checkNotNull(source, "source"); this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options")); @@ -121,18 +124,23 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); this.serializer = new KryoSerializer<>(conf); -createSourceReader(null); -new MetricClient(context).registerGauge( +// init metrics +MetricClient metricClient = new MetricClient(context); +metri
[4/4] beam git commit: This closes #3812
This closes #3812 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef70031b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef70031b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef70031b Branch: refs/heads/jstorm-runner Commit: ef70031b7b0e48ae5750725f8ff6a9cd71f37431 Parents: d2b2851 fec423e Author: Pei He <p...@apache.org> Authored: Fri Sep 8 14:43:37 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Fri Sep 8 14:43:37 2017 +0800 -- runners/jstorm/pom.xml | 1 - .../beam/runners/jstorm/JStormRunnerResult.java | 38 +++- .../beam/runners/jstorm/TestJStormRunner.java | 19 +- .../jstorm/translation/JStormMetricResults.java | 63 +--- .../jstorm/translation/MetricsReporter.java | 56 + .../translation/UnboundedSourceSpout.java | 30 +++--- 6 files changed, 170 insertions(+), 37 deletions(-) --
[2/2] beam git commit: This closes #3819
This closes #3819 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d2b28512 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d2b28512 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d2b28512 Branch: refs/heads/jstorm-runner Commit: d2b2851227ae47f985f0c7d3ff6f198cf9f725b7 Parents: d24d283 4349200 Author: Pei He <p...@apache.org> Authored: Fri Sep 8 14:24:01 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Fri Sep 8 14:24:01 2017 +0800 -- .../jstorm/serialization/BeamUtilsSerializer.java | 2 ++ .../runners/jstorm/translation/DoFnExecutor.java| 3 +-- .../runners/jstorm/translation/ExecutorsBolt.java | 1 - .../jstorm/translation/WindowAssignExecutor.java| 16 +++- 4 files changed, 10 insertions(+), 12 deletions(-) --
[1/2] beam git commit: JStorm-runner: Performance improvement 1. remove some logs on critical path 2. register "TimestampedValue" in Kryo to reduce the serialized size of event value
Repository: beam Updated Branches: refs/heads/jstorm-runner d24d2831d -> d2b285122 JStorm-runner: Performance improvement 1. remove some logs on critical path 2. register "TimestampedValue" in Kryo to reduce the serialized size of event value Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/43492000 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/43492000 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/43492000 Branch: refs/heads/jstorm-runner Commit: 43492000a49e81b6d9a2420148fb2df1735301b0 Parents: d24d283 Author: basti.ljAuthored: Fri Sep 8 12:19:49 2017 +0800 Committer: basti.lj Committed: Fri Sep 8 12:19:49 2017 +0800 -- .../jstorm/serialization/BeamUtilsSerializer.java | 2 ++ .../runners/jstorm/translation/DoFnExecutor.java| 3 +-- .../runners/jstorm/translation/ExecutorsBolt.java | 1 - .../jstorm/translation/WindowAssignExecutor.java| 16 +++- 4 files changed, 10 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/43492000/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java index db1f037..8061a9f 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Instant; /** @@ -110,5 +111,6 @@ public class BeamUtilsSerializer { Lists.newArrayList(w1), PaneInfo.NO_FIRING).getClass()); config.registerSerialization(WindowedValue.of(null, Instant.now(), Lists.newArrayList(w1, w2), PaneInfo.NO_FIRING).getClass()); +config.registerSerialization(TimestampedValue.class); } } http://git-wip-us.apache.org/repos/asf/beam/blob/43492000/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java index 5425b6c..4b021a3 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java @@ -214,7 +214,6 @@ class DoFnExecutor implements Executor { } protected void processMainInput(WindowedValue elem) { -LOG.debug(String.format("Main input: tag=%s, elem=%s", mainInputTag, elem)); if (sideInputs.isEmpty()) { runner.processElement((WindowedValue) elem); } else { @@ -236,7 +235,7 @@ class DoFnExecutor implements Executor { } protected void processSideInput(TupleTag tag, WindowedValue elem) { -LOG.debug(String.format("Side inputs: tag=%s, elem=%s.", tag, elem)); +LOG.debug("Side inputs: tag={}, elem={}.", tag, elem); PCollectionView sideInputView = sideInputTagToView.get(tag); sideInputHandler.addSideInputValue(sideInputView, elem); http://git-wip-us.apache.org/repos/asf/beam/blob/43492000/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java index aca2ca4..1e9a4ff 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java @@ -295,7 +295,6 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { public void processExecutorElem(TupleTag inputTag, WindowedValue elem) { if (elem != null) { - LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag); Executor executor = inputTagToExecutor.get(inputTag); if (executor != null) {
[2/2] beam git commit: This closes #3816
This closes #3816 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d24d2831 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d24d2831 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d24d2831 Branch: refs/heads/jstorm-runner Commit: d24d2831d1529e4d81a024e2deb5dd31482a09c2 Parents: 80bd7f8 6a0d389 Author: Pei He <p...@apache.org> Authored: Thu Sep 7 19:09:58 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Sep 7 19:09:58 2017 +0800 -- .../jstorm/translation/ExecutorsBolt.java | 39 +- .../jstorm/translation/TimerService.java| 3 +- .../jstorm/translation/TimerServiceImpl.java| 54 +++- 3 files changed, 58 insertions(+), 38 deletions(-) --
[1/2] beam git commit: JStorm-runner: Implementation of processing timer
Repository: beam Updated Branches: refs/heads/jstorm-runner 80bd7f8be -> d24d2831d JStorm-runner: Implementation of processing timer Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a0d3896 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a0d3896 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a0d3896 Branch: refs/heads/jstorm-runner Commit: 6a0d389667369ec7d4f85469e6954d47097b7b68 Parents: 80bd7f8 Author: basti.lj <basti...@alibaba-inc.com> Authored: Thu Sep 7 18:42:10 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Sep 7 19:08:20 2017 +0800 -- .../jstorm/translation/ExecutorsBolt.java | 39 +- .../jstorm/translation/TimerService.java| 3 +- .../jstorm/translation/TimerServiceImpl.java| 54 +++- 3 files changed, 58 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/6a0d3896/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java index 3d58a37..aca2ca4 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java @@ -19,12 +19,14 @@ package org.apache.beam.runners.jstorm.translation; import static com.google.common.base.Preconditions.checkNotNull; +import backtype.storm.Config; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBatchBolt; import backtype.storm.tuple.ITupleExt; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import backtype.storm.utils.TupleUtils; import com.alibaba.jstorm.cache.IKvStoreManager; import com.alibaba.jstorm.cache.KvStoreManagerFactory; import com.alibaba.jstorm.cluster.Common; @@ -44,6 +46,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -230,20 +233,25 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { @Override public void execute(Tuple input) { -// process a batch -String streamId = input.getSourceStreamId(); -ITupleExt tuple = (ITupleExt) input; -Iterator<List> valueIterator = tuple.batchValues().iterator(); -if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) { - while (valueIterator.hasNext()) { -processWatermark((Long) valueIterator.next().get(0), input.getSourceTask()); - } +if (TupleUtils.isTick(input)) { + // tick to trigger processing timer + timerService.fireTimers(Instant.now().getMillis(), TimeDomain.PROCESSING_TIME); } else { - doFnStartBundle(); - while (valueIterator.hasNext()) { -processElement(valueIterator.next(), streamId); + // process a batch + String streamId = input.getSourceStreamId(); + ITupleExt tuple = (ITupleExt) input; + Iterator<List> valueIterator = tuple.batchValues().iterator(); + if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) { +while (valueIterator.hasNext()) { + processWatermark((Long) valueIterator.next().get(0), input.getSourceTask()); +} + } else { +doFnStartBundle(); +while (valueIterator.hasNext()) { + processElement(valueIterator.next(), streamId); +} +doFnFinishBundle(); } - doFnFinishBundle(); } } @@ -256,7 +264,7 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { if (newWaterMark != 0) { // Some buffer windows are going to be triggered. doFnStartBundle(); - timerService.fireTimers(newWaterMark); + timerService.fireTimers(newWaterMark, TimeDomain.EVENT_TIME); // SideInput: If receiving water mark with max timestamp, It means no more data is supposed // to be received from now on. So we are going to process all push back data. @@ -310,7 +318,10 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { @Override public Map<String, Object> getComponentConfiguration() { -return null; +Map conf = Maps.newHashMap(); +// Add tick tuple for triggering processing timer +conf.put(Config.TOPOLOGY_TICK_TUPL
[2/2] beam git commit: This closes #3806
This closes #3806 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/80bd7f8b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/80bd7f8b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/80bd7f8b Branch: refs/heads/jstorm-runner Commit: 80bd7f8becf870a49cf930d6ffe5b64dec2104eb Parents: 7a28bf1 34bf5af Author: Pei He <p...@apache.org> Authored: Thu Sep 7 14:13:17 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Sep 7 14:13:17 2017 +0800 -- .../beam/runners/jstorm/TestJStormRunner.java | 32 +--- 1 file changed, 28 insertions(+), 4 deletions(-) --
[1/2] beam git commit: jstorm-runner: handle UserCodeException in TestJStormRunner, and wraps in PipelineExecutionException if receives a checked Exception.
Repository: beam Updated Branches: refs/heads/jstorm-runner 7a28bf1af -> 80bd7f8be jstorm-runner: handle UserCodeException in TestJStormRunner, and wraps in PipelineExecutionException if receives a checked Exception. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/34bf5af9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/34bf5af9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/34bf5af9 Branch: refs/heads/jstorm-runner Commit: 34bf5af9b19c2fb90f2301823b698d06f21879a1 Parents: 7a28bf1 Author: Pei He <p...@apache.org> Authored: Mon Sep 4 15:02:14 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Sep 7 14:12:51 2017 +0800 -- .../beam/runners/jstorm/TestJStormRunner.java | 32 +--- 1 file changed, 28 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/34bf5af9/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index 9d2e2f1..b637b7c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.util.UserCodeException; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,23 +85,23 @@ public class TestJStormRunner extends PipelineRunner { result.getTopologyName(), numberOfAssertions); if (numberOfAssertions == 0) { result.waitUntilFinish(Duration.millis(RESULT_WAITING_TIME_MS)); -Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); +Throwable taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); if (taskExceptionRec != null) { LOG.info("Exception was found.", taskExceptionRec); - throw new RuntimeException(taskExceptionRec.getCause()); + handleTaskException(taskExceptionRec); } return result; } else { for (int waitTime = 0; waitTime <= ASSERTION_WAITING_TIME_MS;) { Optional success = checkForPAssertSuccess(result.metrics(), numberOfAssertions); - Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); + Throwable taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); if (success.isPresent() && success.get()) { return result; } else if (success.isPresent() && !success.get()) { throw new AssertionError("Failed assertion checks."); } else if (taskExceptionRec != null) { LOG.info("Exception was found.", taskExceptionRec); -throw new RuntimeException(taskExceptionRec.getCause()); +handleTaskException(taskExceptionRec); } else { JStormUtils.sleepMs(RESULT_CHECK_INTERVAL_MS); waitTime += RESULT_CHECK_INTERVAL_MS; @@ -116,6 +117,29 @@ public class TestJStormRunner extends PipelineRunner { } } + private void handleTaskException(Throwable taskExceptionRec) { +Throwable cause; +if (taskExceptionRec.getCause() != null) { + cause = taskExceptionRec.getCause(); +} else { + cause = taskExceptionRec; +} + +UserCodeException innermostUserCodeException = null; +for (Throwable current = cause; current.getCause() != null; current = current.getCause()) { + if (current instanceof UserCodeException) { +innermostUserCodeException = ((UserCodeException) current); + } +} +if (innermostUserCodeException != null) { + cause = innermostUserCodeException.getCause(); +} +if (cause instanceof AssertionError) { + throw (AssertionError) cause; +} +throw new Pipeline.PipelineExecutionException(cause); + } + private Optional checkForPAssertSuccess( MetricResults metricResults, int expectedNumberOfAssertions) {
[jira] [Assigned] (BEAM-2846) Support SplittableParDo in MapReduce runner
[ https://issues.apache.org/jira/browse/BEAM-2846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pei He reassigned BEAM-2846: Assignee: (was: Pei He) > Support SplittableParDo in MapReduce runner > --- > > Key: BEAM-2846 > URL: https://issues.apache.org/jira/browse/BEAM-2846 > Project: Beam > Issue Type: New Feature > Components: runner-mapreduce >Reporter: Pei He > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2846) Support SplittableParDo in MapReduce runner
Pei He created BEAM-2846: Summary: Support SplittableParDo in MapReduce runner Key: BEAM-2846 URL: https://issues.apache.org/jira/browse/BEAM-2846 Project: Beam Issue Type: New Feature Components: runner-mapreduce Reporter: Pei He Assignee: Pei He -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2845) Support TimersInParDo in MapReduce runner
Pei He created BEAM-2845: Summary: Support TimersInParDo in MapReduce runner Key: BEAM-2845 URL: https://issues.apache.org/jira/browse/BEAM-2845 Project: Beam Issue Type: New Feature Components: runner-mapreduce Reporter: Pei He -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[4/4] beam git commit: This closes #3792
This closes #3792 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7a28bf1a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7a28bf1a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7a28bf1a Branch: refs/heads/jstorm-runner Commit: 7a28bf1af44bb498b5702031e64edd3d918245a6 Parents: 9148899 6f40506 Author: Pei He <p...@apache.org> Authored: Mon Sep 4 12:58:50 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Mon Sep 4 12:58:50 2017 +0800 -- .../beam/runners/jstorm/JStormRunnerResult.java | 53 +- .../beam/runners/jstorm/TestJStormRunner.java | 100 +++--- .../jstorm/translation/CommonInstance.java | 5 + .../jstorm/translation/DoFnExecutor.java| 5 +- .../jstorm/translation/ExecutorsBolt.java | 35 ++- .../translation/GroupByWindowExecutor.java | 2 +- .../jstorm/translation/JStormMetricResults.java | 105 +++ .../jstorm/translation/MetricsReporter.java | 36 ++- .../translation/UnboundedSourceSpout.java | 38 --- 9 files changed, 308 insertions(+), 71 deletions(-) --
[3/4] beam git commit: [BEAM-2824] Uses PipelineResult in TestJStormRunner.
[BEAM-2824] Uses PipelineResult in TestJStormRunner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f40506a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f40506a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f40506a Branch: refs/heads/jstorm-runner Commit: 6f40506a979bdcac3d1125bbe809b092d497a2f6 Parents: df75d80 Author: Pei He <p...@apache.org> Authored: Wed Aug 30 14:50:20 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Mon Sep 4 12:57:53 2017 +0800 -- .../beam/runners/jstorm/JStormRunnerResult.java | 6 ++ .../beam/runners/jstorm/TestJStormRunner.java | 100 --- .../jstorm/translation/DoFnExecutor.java| 1 - 3 files changed, 68 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/6f40506a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java index 782896e..3962ca2 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -78,6 +78,7 @@ public abstract class JStormRunnerResult implements PipelineResult { private final LocalCluster localCluster; private final long localModeExecuteTimeSecs; +private boolean cancelled; LocalJStormPipelineResult( String topologyName, @@ -87,6 +88,7 @@ public abstract class JStormRunnerResult implements PipelineResult { super(topologyName, config); this.localCluster = checkNotNull(localCluster, "localCluster"); this.localModeExecuteTimeSecs = localModeExecuteTimeSecs; + this.cancelled = false; } @Override @@ -94,11 +96,15 @@ public abstract class JStormRunnerResult implements PipelineResult { localCluster.killTopology(getTopologyName()); localCluster.shutdown(); JStormUtils.sleepMs(1000); + cancelled = true; return State.CANCELLED; } @Override public State waitUntilFinish(Duration duration) { + if (cancelled) { +return State.CANCELLED; + } Sleeper sleeper = Sleeper.DEFAULT; BackOff backOff = FluentBackoff.DEFAULT.withMaxCumulativeBackoff(duration).backoff(); try { http://git-wip-us.apache.org/repos/asf/beam/blob/6f40506a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index c9990e4..9d2e2f1 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -21,10 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.alibaba.jstorm.common.metric.AsmMetric; import com.alibaba.jstorm.metric.AsmMetricRegistry; -import com.alibaba.jstorm.metric.AsmWindow; import com.alibaba.jstorm.metric.JStormMetrics; -import com.alibaba.jstorm.metric.MetaType; -import com.alibaba.jstorm.metric.MetricType; import com.alibaba.jstorm.task.error.TaskReportErrorAndDie; import com.alibaba.jstorm.utils.JStormUtils; import com.google.common.base.Optional; @@ -34,8 +31,13 @@ import java.util.Iterator; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.PAssert; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,31 +82,32 @@ public class TestJStormRunner extends PipelineRunner { LOG.info("Running JStorm job {} with {} expected assertions.", result.getTopologyName(), numberOfAssertions); - - int maxTimeoutMs = - numberOfAssertions > 0 ? ASSERTION_WAITING_TIME_MS : RESULT_WAITING_TIME_MS; - for (int waitTime = 0; waitTime <= maxTimeoutMs; ) { -Optional success = numberOfAssertions > 0 -? checkForPAssertSuccess(numberOfAssertions) : Optional.absent(); + if (numberOfAssertions == 0) { +
[2/4] beam git commit: [BEAM-2824] support PipelineResult.waitUntilFinish() in jstorm local mode.
[BEAM-2824] support PipelineResult.waitUntilFinish() in jstorm local mode. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df75d807 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df75d807 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df75d807 Branch: refs/heads/jstorm-runner Commit: df75d8074d7067bf6078289f1f2dfa36548fcd5e Parents: cda4e62 Author: Pei He <p...@apache.org> Authored: Tue Aug 29 20:10:06 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Mon Sep 4 12:57:50 2017 +0800 -- .../beam/runners/jstorm/JStormRunnerResult.java | 44 +++- .../jstorm/translation/CommonInstance.java | 5 +++ .../jstorm/translation/DoFnExecutor.java| 4 +- .../jstorm/translation/ExecutorsBolt.java | 35 +--- .../translation/GroupByWindowExecutor.java | 2 +- .../jstorm/translation/MetricsReporter.java | 6 +-- .../translation/UnboundedSourceSpout.java | 38 ++--- 7 files changed, 104 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java index 98d967f..782896e 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -17,15 +17,26 @@ */ package org.apache.beam.runners.jstorm; +import static com.alibaba.jstorm.metric.AsmWindow.M10_WINDOW; import static com.google.common.base.Preconditions.checkNotNull; import backtype.storm.Config; import backtype.storm.LocalCluster; +import com.alibaba.jstorm.common.metric.AsmGauge; +import com.alibaba.jstorm.metric.AsmMetricRegistry; +import com.alibaba.jstorm.metric.JStormMetrics; import com.alibaba.jstorm.utils.JStormUtils; import java.io.IOException; +import java.util.Map; +import org.apache.beam.runners.jstorm.translation.CommonInstance; import org.apache.beam.runners.jstorm.translation.JStormMetricResults; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; import org.joda.time.Duration; /** @@ -88,12 +99,21 @@ public abstract class JStormRunnerResult implements PipelineResult { @Override public State waitUntilFinish(Duration duration) { - JStormUtils.sleepMs(duration.getMillis()); + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backOff = FluentBackoff.DEFAULT.withMaxCumulativeBackoff(duration).backoff(); try { -return cancel(); +do { + if (globalWatermark() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { +return State.DONE; + } +} while (BackOffUtils.next(sleeper, backOff)); + } catch (InterruptedException e) { +Thread.currentThread().interrupt(); +// Ignore InterruptedException } catch (IOException e) { throw new RuntimeException(e); } + return State.RUNNING; } @Override @@ -105,5 +125,25 @@ public abstract class JStormRunnerResult implements PipelineResult { public MetricResults metrics() { return new JStormMetricResults(); } + +private long globalWatermark() { + AsmMetricRegistry metricRegistry = JStormMetrics.getTaskMetrics(); + boolean foundWatermark = false; + double min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + for (Map.Entry<String, AsmGauge> entry : metricRegistry.getGauges().entrySet()) { +if (entry.getKey().endsWith(CommonInstance.BEAM_OUTPUT_WATERMARK_METRICS)) { + foundWatermark = true; + double outputWatermark = (double) entry.getValue().getValue(M10_WINDOW); + if (outputWatermark < min) { +min = outputWatermark; + } +} + } + if (foundWatermark) { +return (long) min; + } else { +return BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + } +} } } http://git-wip-us.apache.org/repos/asf/beam/blob/df75d807/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/ru
[1/4] beam git commit: [BEAM-2824] support gauge and PipelineResults.metrics() in local mode.
Repository: beam Updated Branches: refs/heads/jstorm-runner 914889925 -> 7a28bf1af [BEAM-2824] support gauge and PipelineResults.metrics() in local mode. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cda4e629 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cda4e629 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cda4e629 Branch: refs/heads/jstorm-runner Commit: cda4e6293a13d387cee5c2920335b6bd053574d7 Parents: 9148899 Author: Pei He <p...@apache.org> Authored: Wed Aug 30 15:16:44 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Aug 31 13:52:47 2017 +0800 -- .../beam/runners/jstorm/JStormRunnerResult.java | 3 +- .../jstorm/translation/JStormMetricResults.java | 105 +++ .../jstorm/translation/MetricsReporter.java | 30 +- 3 files changed, 136 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/cda4e629/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java index b6b5281..98d967f 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -23,6 +23,7 @@ import backtype.storm.Config; import backtype.storm.LocalCluster; import com.alibaba.jstorm.utils.JStormUtils; import java.io.IOException; +import org.apache.beam.runners.jstorm.translation.JStormMetricResults; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; import org.joda.time.Duration; @@ -102,7 +103,7 @@ public abstract class JStormRunnerResult implements PipelineResult { @Override public MetricResults metrics() { - throw new UnsupportedOperationException("This method is not yet supported."); + return new JStormMetricResults(); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/cda4e629/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java new file mode 100644 index 000..dbaa28e --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import com.alibaba.jstorm.common.metric.AsmCounter; +import com.alibaba.jstorm.common.metric.AsmGauge; +import com.alibaba.jstorm.metric.AsmMetricRegistry; +import com.alibaba.jstorm.metric.AsmWindow; +import com.alibaba.jstorm.metric.JStormMetrics; +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.metrics.MetricFiltering; +import org.apache.beam.runners.core.metrics.MetricKey; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.joda.time.Instant; + +/** + * Implementation of {@link MetricResults} for the JStorm Runner. + */ +public class JStormMetricResults extends MetricResults { + @Override + public MetricQueryResults queryMetrics(MetricsFilter fi
[jira] [Created] (BEAM-2839) MR runner: add runner users guide under documentation/runners
Pei He created BEAM-2839: Summary: MR runner: add runner users guide under documentation/runners Key: BEAM-2839 URL: https://issues.apache.org/jira/browse/BEAM-2839 Project: Beam Issue Type: Task Components: runner-mapreduce Reporter: Pei He Assignee: Pei He -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2838) MR runner: update the capability matrix with the current status.
Pei He created BEAM-2838: Summary: MR runner: update the capability matrix with the current status. Key: BEAM-2838 URL: https://issues.apache.org/jira/browse/BEAM-2838 Project: Beam Issue Type: Task Components: runner-mapreduce Reporter: Pei He Assignee: Pei He -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2835) Support executing MapReduce jobs in parallel.
Pei He created BEAM-2835: Summary: Support executing MapReduce jobs in parallel. Key: BEAM-2835 URL: https://issues.apache.org/jira/browse/BEAM-2835 Project: Beam Issue Type: New Feature Components: runner-mapreduce Reporter: Pei He Currently, the runner executes MR jobs sequentially with the topological order. This is very inefficient, it will be good to allow jobs to be executed in parallel if they don't depend on either other. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2830) Main thread needs to re-throw worker exceptions for ValidatesRunner tests that expects exceptions.
Pei He created BEAM-2830: Summary: Main thread needs to re-throw worker exceptions for ValidatesRunner tests that expects exceptions. Key: BEAM-2830 URL: https://issues.apache.org/jira/browse/BEAM-2830 Project: Beam Issue Type: New Feature Components: runner-mapreduce Reporter: Pei He -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (BEAM-2783) Support Counters/Metrics and run ValidatesRunner tests
[ https://issues.apache.org/jira/browse/BEAM-2783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pei He reassigned BEAM-2783: Assignee: Pei He > Support Counters/Metrics and run ValidatesRunner tests > -- > > Key: BEAM-2783 > URL: https://issues.apache.org/jira/browse/BEAM-2783 > Project: Beam > Issue Type: New Feature > Components: runner-mapreduce >Reporter: Pei He >Assignee: Pei He > > It is important to be able to run ValidatesRunner tests. > And, we need wire MapReduce runner with the framework's counters. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[3/3] beam git commit: This closes #3789
This closes #3789 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/91488992 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/91488992 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/91488992 Branch: refs/heads/jstorm-runner Commit: 9148899254ea873b4a2c5f3314fa30c3b633cea9 Parents: e00e0e8 c952686 Author: Pei He <p...@apache.org> Authored: Wed Aug 30 14:57:48 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Wed Aug 30 14:57:48 2017 +0800 -- .../java/org/apache/beam/runners/jstorm/JStormRunnerResult.java | 5 +++-- .../apache/beam/runners/jstorm/translation/MetricsReporter.java | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) --
[2/3] beam git commit: jstorm-runner: Fix incorrect updating of counter metrics
jstorm-runner: Fix incorrect updating of counter metrics Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9526869 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9526869 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9526869 Branch: refs/heads/jstorm-runner Commit: c95268691f78a629866f722df1a3f7ef5e76a256 Parents: 557d703 Author: basti.lj <basti...@alibaba-inc.com> Authored: Wed Aug 30 10:45:45 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Wed Aug 30 14:55:17 2017 +0800 -- .../apache/beam/runners/jstorm/translation/MetricsReporter.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/c9526869/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java index e7f3285..0315a59 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java @@ -72,7 +72,7 @@ class MetricsReporter { AsmCounter counter = metricClient.registerCounter(metricName); Long incValue = (oldValue == null ? updateValue : updateValue - oldValue); counter.update(incValue); -reportedCounters.put(metricName, incValue); +reportedCounters.put(metricName, updateValue); } } }
[1/3] beam git commit: jstorm-runner: Fix the bug that max waiting time is missing on local mode
Repository: beam Updated Branches: refs/heads/jstorm-runner e00e0e841 -> 914889925 jstorm-runner: Fix the bug that max waiting time is missing on local mode Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/557d7036 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/557d7036 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/557d7036 Branch: refs/heads/jstorm-runner Commit: 557d7036efe3bcb83ea99ca14ad052052bab5add Parents: e00e0e8 Author: basti.lj <basti...@alibaba-inc.com> Authored: Wed Aug 30 10:45:17 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Wed Aug 30 14:55:15 2017 +0800 -- .../java/org/apache/beam/runners/jstorm/JStormRunnerResult.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/557d7036/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java index 4b1850e..b6b5281 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -64,8 +64,8 @@ public abstract class JStormRunnerResult implements PipelineResult { private static class LocalJStormPipelineResult extends JStormRunnerResult { -private LocalCluster localCluster; -private long localModeExecuteTimeSecs; +private final LocalCluster localCluster; +private final long localModeExecuteTimeSecs; LocalJStormPipelineResult( String topologyName, @@ -74,6 +74,7 @@ public abstract class JStormRunnerResult implements PipelineResult { long localModeExecuteTimeSecs) { super(topologyName, config); this.localCluster = checkNotNull(localCluster, "localCluster"); + this.localModeExecuteTimeSecs = localModeExecuteTimeSecs; } @Override
[jira] [Created] (BEAM-2824) Support PipelineResult in JStormRunner
Pei He created BEAM-2824: Summary: Support PipelineResult in JStormRunner Key: BEAM-2824 URL: https://issues.apache.org/jira/browse/BEAM-2824 Project: Beam Issue Type: New Feature Components: runner-jstorm Reporter: Pei He Assignee: Pei He Here are the work items: 1. supports metrics() in local mode. 2. supports waitUntilFinish() in local mode. 3. uses PipelineResult in TestJStormRunner. 4. supports metrics() in cluster mode. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-1763) TestPipeline should ensure that all assertions succeeded
[ https://issues.apache.org/jira/browse/BEAM-1763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146547#comment-16146547 ] Pei He commented on BEAM-1763: -- Do we require runners to quiesce when bounded data was processed? Otherwise, runner that supports metrics but doesn't quiesce (run in streaming mode) still cannot use TestPipeline. > TestPipeline should ensure that all assertions succeeded > > > Key: BEAM-1763 > URL: https://issues.apache.org/jira/browse/BEAM-1763 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Thomas Groh >Assignee: Aviem Zur > Fix For: 2.0.0 > > > This doesn't need to be part of each {{PipelineRunner}} implementation if it > goes through the {{PipelineResult}} APIs. The assertion can be of the form > that if the Pipeline is finished, then the number of successful assertions is > equal to the total number of assertions. > Suggested solution: > For runners which support metrics, use the counters for successful/failed > assertions and compare them to expected number of assertions. > Runners which do not support metrics should either implement metrics or > override {{PAssert}} in a way that verifies its execution. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (BEAM-2757) Link to Jenkins view is dead
[ https://issues.apache.org/jira/browse/BEAM-2757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pei He closed BEAM-2757. Resolution: Fixed Assignee: Pei He (was: Jason Kuster) Fix Version/s: Not applicable I think this is fixed in https://github.com/apache/beam-site/pull/283 > Link to Jenkins view is dead > > > Key: BEAM-2757 > URL: https://issues.apache.org/jira/browse/BEAM-2757 > Project: Beam > Issue Type: Bug > Components: website >Reporter: Kenneth Knowles >Assignee: Pei He > Fix For: Not applicable > > > Discovered since the dead link check is failing on all website PRs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (BEAM-2126) Add JStorm runner to "Conbribute > Technical References > Ongoing Projects"
[ https://issues.apache.org/jira/browse/BEAM-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pei He closed BEAM-2126. Resolution: Fixed Fix Version/s: Not applicable > Add JStorm runner to "Conbribute > Technical References > Ongoing Projects" > --- > > Key: BEAM-2126 > URL: https://issues.apache.org/jira/browse/BEAM-2126 > Project: Beam > Issue Type: Improvement > Components: runner-jstorm, website >Reporter: Kenneth Knowles >Assignee: Pei He > Fix For: Not applicable > > > We should have this effort listed here: > https://beam.apache.org/contribute/work-in-progress/ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2783) Support Counters/Metrics and run ValidatesRunner tests
[ https://issues.apache.org/jira/browse/BEAM-2783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pei He updated BEAM-2783: - Issue Type: New Feature (was: Bug) > Support Counters/Metrics and run ValidatesRunner tests > -- > > Key: BEAM-2783 > URL: https://issues.apache.org/jira/browse/BEAM-2783 > Project: Beam > Issue Type: New Feature > Components: runner-mapreduce >Reporter: Pei He > > It is important to be able to run ValidatesRunner tests. > And, we need wire MapReduce runner with the framework's counters. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (BEAM-2783) Support Counters/Metrics and run ValidatesRunner tests
Pei He created BEAM-2783: Summary: Support Counters/Metrics and run ValidatesRunner tests Key: BEAM-2783 URL: https://issues.apache.org/jira/browse/BEAM-2783 Project: Beam Issue Type: Bug Components: runner-mapreduce Reporter: Pei He It is important to be able to run ValidatesRunner tests. And, we need wire MapReduce runner with the framework's counters. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[18/53] [abbrv] beam git commit: jstorm-runner: remove top level classes RunnerUtils and SingletonKeyedWorkItem.
jstorm-runner: remove top level classes RunnerUtils and SingletonKeyedWorkItem. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74ceac61 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74ceac61 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74ceac61 Branch: refs/heads/jstorm-runner Commit: 74ceac6173f78c76247b9ea4cb8179ca1ed9f62d Parents: 8265353 Author: Pei He <p...@apache.org> Authored: Fri Jul 14 15:40:23 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:57 2017 +0800 -- .../translation/GroupByWindowExecutor.java | 45 +- .../runners/jstorm/translation/RunnerUtils.java | 51 .../translation/SingletonKeyedWorkItem.java | 62 .../jstorm/translation/TranslationContext.java | 13 +++- 4 files changed, 56 insertions(+), 115 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/74ceac61/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java index bf6e1ad..1c858b7 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.ImmutableList; import java.io.Serializable; +import java.util.Collections; import java.util.List; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -152,7 +153,7 @@ class GroupByWindowExecutor<K, V> * For GroupByKey, KV type elem is received. We need to convert the KV elem * into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner. */ -KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem); +KeyedWorkItem<K, V> keyedWorkItem = toKeyedWorkItem((WindowedValue<KV<K, V>>) elem); runner.processElement(elem.withValue(keyedWorkItem)); } @@ -170,4 +171,46 @@ class GroupByWindowExecutor<K, V> public String toString() { return super.toString(); } + + private <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> kvElem) { +SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of( +kvElem.getValue().getKey(), +kvElem.withValue(kvElem.getValue().getValue())); +return workItem; + } + + private static class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> { + +final K key; +final WindowedValue value; + +private SingletonKeyedWorkItem(K key, WindowedValue value) { + this.key = key; + this.value = value; +} + +public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of( +K key, WindowedValue value) { + return new SingletonKeyedWorkItem<>(key, value); +} + +@Override +public K key() { + return key; +} + +public WindowedValue value() { + return value; +} + +@Override +public Iterable timersIterable() { + return Collections.EMPTY_LIST; +} + +@Override +public Iterable<WindowedValue> elementsIterable() { + return Collections.singletonList(value); +} + } } http://git-wip-us.apache.org/repos/asf/beam/blob/74ceac61/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java deleted file mode 100644 index 4f469f3..000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/
[33/53] [abbrv] beam git commit: jstorm-runner: Throw AssertionError instead of RuntimeException when pipeline encounter exception
jstorm-runner: Throw AssertionError instead of RuntimeException when pipeline encounter exception Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/af5221c0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/af5221c0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/af5221c0 Branch: refs/heads/jstorm-runner Commit: af5221c001678e36de6492fa20b3fc4026f486e8 Parents: dc6f63c Author: basti.lj <basti...@alibaba-inc.com> Authored: Tue Jul 18 14:50:19 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:58 2017 +0800 -- .../beam/runners/jstorm/TestJStormRunner.java | 41 ++-- 1 file changed, 21 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/af5221c0/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index a117675..0088cf9 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -47,6 +47,7 @@ public class TestJStormRunner extends PipelineRunner { @Override public JStormRunnerResult run(Pipeline pipeline) { +TaskReportErrorAndDie.setExceptionRecord(null); JStormRunnerResult result = stormRunner.run(pipeline); try { @@ -54,30 +55,30 @@ public class TestJStormRunner extends PipelineRunner { LOG.info("Running JStorm job {} with {} expected assertions.", result.getTopologyName(), numberOfAssertions); - if (numberOfAssertions == 0) { -// If assert number is zero, wait 5 sec -JStormUtils.sleepMs(5000); + + int maxTimeoutSec = numberOfAssertions > 0 ? 20 : 5; + for (int waitTime = 0; waitTime <= maxTimeoutSec * 1000; ) { +Optional success = numberOfAssertions > 0 +? checkForPAssertSuccess(numberOfAssertions) : Optional.absent(); Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); -if (taskExceptionRec != null) { - throw new RuntimeException(taskExceptionRec.getCause()); -} -return result; - } else { -for (int i = 0; i < 40; ++i) { - Optional success = checkForPAssertSuccess(numberOfAssertions); - Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); - if (success.isPresent() && success.get()) { -return result; - } else if (success.isPresent() && !success.get()) { -throw new AssertionError("Failed assertion checks."); - } else if (taskExceptionRec != null) { -throw new RuntimeException(taskExceptionRec.getCause()); - } else { -JStormUtils.sleepMs(500); - } +if (success.isPresent() && success.get()) { + return result; +} else if (success.isPresent() && !success.get()) { + throw new AssertionError("Failed assertion checks."); +} else if (taskExceptionRec != null) { + LOG.info("Exception was found.", taskExceptionRec); + throw new AssertionError(taskExceptionRec.getCause()); +} else { + JStormUtils.sleepMs(500); + waitTime += 500; } + } + + if (numberOfAssertions > 0) { LOG.info("Assertion checks timed out."); throw new AssertionError("Assertion checks timed out."); + } else { +return result; } } finally { clearPAssertCount();
[27/53] [abbrv] beam git commit: jstorm-runner: move jstorm state implementations to JStormStateInternals inner classes.
jstorm-runner: move jstorm state implementations to JStormStateInternals inner classes. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9abbbd06 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9abbbd06 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9abbbd06 Branch: refs/heads/jstorm-runner Commit: 9abbbd064e878a961ff3e8fc62d96ea650fd7570 Parents: 8cdd41b Author: Pei He <p...@apache.org> Authored: Fri Jul 14 16:10:29 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:57 2017 +0800 -- .../jstorm/translation/JStormBagState.java | 180 --- .../translation/JStormCombiningState.java | 88 .../jstorm/translation/JStormMapState.java | 158 --- .../translation/JStormStateInternals.java | 464 +++ .../jstorm/translation/JStormValueState.java| 82 .../translation/JStormWatermarkHoldState.java | 82 6 files changed, 464 insertions(+), 590 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/9abbbd06/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java deleted file mode 100644 index 3e5d52b..000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.alibaba.jstorm.cache.ComposedKey; -import com.alibaba.jstorm.cache.IKvStore; -import com.alibaba.jstorm.cache.KvStoreIterable; -import java.io.IOException; -import java.util.Iterator; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.ReadableState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implementation of {@link BagState} in JStorm runner. - */ -class JStormBagState<K, T> implements BagState { - private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class); - - @Nullable - private final K key; - private final StateNamespace namespace; - private final IKvStore<ComposedKey, T> kvState; - private final IKvStore<ComposedKey, Object> stateInfoKvState; - private int elemIndex; - - public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState, -IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException { -this.key = key; -this.namespace = checkNotNull(namespace, "namespace"); -this.kvState = checkNotNull(kvState, "kvState"); -this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState"); - -Integer index = (Integer) stateInfoKvState.get(getComposedKey()); -this.elemIndex = index != null ? ++index : 0; - } - - @Override - public void add(T input) { -try { - kvState.put(getComposedKey(elemIndex), input); - stateInfoKvState.put(getComposedKey(), elemIndex); - elemIndex++; -} catch (IOException e) { - throw new RuntimeException(e.getCause()); -} - } - - @Override - public ReadableState isEmpty() { -return new ReadableState() { - @Override - public Boolean read() { -return elemIndex <= 0; - } - - @Override - public ReadableState readLater() { -// TODO: support prefetch. -return this; - } -}; - } - - @Override - public Iterable read() { -return new BagStateIterable(elemIndex); - } - - @Override - public BagState rea
[22/53] [abbrv] beam git commit: jstorm-runner: move most classes to translation package and reduece their visibility to package private.
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java deleted file mode 100644 index a26472c..000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * JStorm {@link Executor} for {@link DoFn} with multi-output. - * @param - * @param - */ -public class MultiOutputDoFnExecutorextends DoFnExecutor { - private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class); - - /** - * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated - * tag is used in downstream consumer. So before output, we need to map this "local" tag to - * "external" tag. See PCollectionTuple for details. - */ - public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager { -@Override -public void output(TupleTag tag, WindowedValue output) { - if (localTupleTagMap.containsKey(tag)) { -executorsBolt.processExecutorElem((TupleTag) localTupleTagMap.get(tag), output); - } else { -executorsBolt.processExecutorElem(tag, output); - } -} - } - - protected Map localTupleTagMap; - - public MultiOutputDoFnExecutor( - String stepName, - String description, - JStormPipelineOptions pipelineOptions, - DoFn doFn, - Coder inputCoder, - WindowingStrategy windowingStrategy, - TupleTag mainInputTag, - Collection sideInputs, - Map sideInputTagToView, - TupleTag mainTupleTag, - List sideOutputTags, - Map localTupleTagMap - ) { -super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, -sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); -this.localTupleTagMap = localTupleTagMap; -this.outputManager = new MultiOutputDoFnExecutorOutputManager(); -LOG.info("localTupleTagMap: {}", localTupleTagMap); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java deleted file mode 100644 index 5e87cff..000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You
[03/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java new file mode 100644 index 000..f101beb --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime.state; + +import com.alibaba.jstorm.cache.IKvStore; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.ReadableState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +public class JStormMapStateimplements MapState { +private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class); + +private final K key; +private final StateNamespace namespace; +private IKvStore kvStore; + +public JStormMapState(K key, StateNamespace namespace, IKvStore kvStore) { +this.key = key; +this.namespace = namespace; +this.kvStore = kvStore; +} + +@Override +public void put(K var1, V var2) { +try { +kvStore.put(var1, var2); +} catch (IOException e) { +reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e); +} +} + +@Override +public ReadableState putIfAbsent(K var1, V var2) { +ReadableState ret = null; +try { +V value = kvStore.get(var1); +if (value == null) { +kvStore.put(var1, var2); +ret = new MapReadableState<>(null); +} else { +ret = new MapReadableState<>(value); +} +} catch (IOException e) { +reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e); +} +return ret; +} + +@Override +public void remove(K var1) { +try { +kvStore.remove(var1); +} catch (IOException e) { +reportError(String.format("Failed to remove key=%s", var1), e); +} +} + +@Override +public ReadableState get(K var1) { +ReadableState ret = new MapReadableState<>(null); +try { +ret = new MapReadableState(kvStore.get(var1)); +} catch (IOException e) { +reportError(String.format("Failed to get value for key=%s", var1), e); +} +return ret; +} + +@Override +public ReadableState keys() { +ReadableState ret = new MapReadableState<>(null); +try { +ret = new MapReadableState<>(kvStore.keys()); +} catch (IOException e) { +reportError(String.format("Failed to get keys"), e); +} +return ret; +} + +@Override +public ReadableState values() { +ReadableState ret = new MapReadableState<>(null); +try { +ret = new MapReadableState<>(kvStore.values()); +} catch (IOException e) { +reportError(String.format("Failed to get values"), e); +} +return ret; +} + +@Override +public ReadableState >> entries() { +ReadableState >> ret = new MapReadableState<>(null); +try { +ret = new MapReadableState<>(kvStore.entries()); +} catch (IOException e) { +reportError(String.format("Failed to get values"), e); +} +return ret; +} + +@Override +public void clear() { +try { +Iterable keys = kvStore.keys(); +kvStore.removeBatch(keys); +} catch (IOException e) { +
[21/53] [abbrv] beam git commit: jstorm-runner: move most classes to translation package and reduece their visibility to package private.
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java deleted file mode 100644 index 6e3392c..000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor; -import org.apache.beam.runners.jstorm.translation.runtime.MultiOutputDoFnExecutor; -import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.PValueBase; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}. - */ -public class ParDoBoundMultiTranslator-extends TransformTranslator.Default > { - - @Override - public void translateNode( - ParDo.MultiOutput transform, TranslationContext context) { -final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); -final TupleTag inputTag = (TupleTag) userGraphContext.getInputTag(); -PCollection input = (PCollection) userGraphContext.getInput(); - -Map allOutputs = Maps.newHashMap(userGraphContext.getOutputs()); -Map localToExternalTupleTagMap = Maps.newHashMap(); -for (Map.Entry entry : allOutputs.entrySet()) { - Iterator itr = ((PValueBase) entry.getValue()).expand().keySet().iterator(); - localToExternalTupleTagMap.put(entry.getKey(), itr.next()); -} - -TupleTag mainOutputTag = (TupleTag) userGraphContext.getOutputTag(); -List sideOutputTags = userGraphContext.getOutputTags(); -sideOutputTags.remove(mainOutputTag); - -Map allInputs = Maps.newHashMap(userGraphContext.getInputs()); -for (PCollectionView pCollectionView : transform.getSideInputs()) { - allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); -} -String description = describeTransform( -transform, -allInputs, -allOutputs); - -ImmutableMap.Builder sideInputTagToView = ImmutableMap.builder(); -for (PCollectionView pCollectionView : transform.getSideInputs()) { - sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); -} - -DoFnExecutor executor; -DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); -if (signature.stateDeclarations().size() > 0 -|| signature.timerDeclarations().size() > 0) { - executor = new MultiStatefulDoFnExecutor<>( - userGraphContext.getStepName(), - description, - userGraphContext.getOptions(), - (DoFn ) transform.getFn(), - (Coder) WindowedValue.getFullCoder(
[25/53] [abbrv] beam git commit: jstorm-runner: move most classes to translation package and reduece their visibility to package private.
jstorm-runner: move most classes to translation package and reduece their visibility to package private. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82653534 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82653534 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82653534 Branch: refs/heads/jstorm-runner Commit: 82653534b0b738ee84ed94a67f9344393778d033 Parents: 9309ac4 Author: Pei He <p...@apache.org> Authored: Fri Jul 14 15:28:53 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:57 2017 +0800 -- .../beam/runners/jstorm/JStormRunner.java | 14 +- .../jstorm/serialization/package-info.java | 22 ++ .../jstorm/translation/AbstractComponent.java | 67 .../translation/BoundedSourceTranslator.java| 49 +++ .../jstorm/translation/CommonInstance.java | 28 ++ .../jstorm/translation/DefaultStepContext.java | 90 + .../jstorm/translation/DoFnExecutor.java| 339 + .../translation/DoFnRunnerWithMetrics.java | 91 + .../runners/jstorm/translation/Executor.java| 36 ++ .../jstorm/translation/ExecutorContext.java | 41 ++ .../jstorm/translation/ExecutorsBolt.java | 338 + .../jstorm/translation/FlattenExecutor.java | 60 +++ .../jstorm/translation/FlattenTranslator.java | 49 +++ .../translation/GroupByKeyTranslator.java | 71 .../translation/GroupByWindowExecutor.java | 173 + .../jstorm/translation/JStormBagState.java | 180 + .../translation/JStormCombiningState.java | 88 + .../jstorm/translation/JStormMapState.java | 158 .../translation/JStormPipelineTranslator.java | 2 - .../translation/JStormStateInternals.java | 190 ++ .../translation/JStormTimerInternals.java | 97 + .../jstorm/translation/JStormValueState.java| 82 .../translation/JStormWatermarkHoldState.java | 82 .../jstorm/translation/MetricsReporter.java | 87 + .../translation/MultiOutputDoFnExecutor.java| 79 .../translation/MultiStatefulDoFnExecutor.java | 70 .../translation/ParDoBoundMultiTranslator.java | 114 ++ .../translation/ParDoBoundTranslator.java | 107 ++ .../runners/jstorm/translation/RunnerUtils.java | 51 +++ .../translation/SerializedPipelineOptions.java | 65 .../translation/SingletonKeyedWorkItem.java | 62 +++ .../translation/StatefulDoFnExecutor.java | 68 .../beam/runners/jstorm/translation/Stream.java | 104 + .../jstorm/translation/TimerService.java| 51 +++ .../jstorm/translation/TimerServiceImpl.java| 155 .../jstorm/translation/TransformTranslator.java | 79 .../jstorm/translation/TranslationContext.java | 6 - .../jstorm/translation/TranslatorRegistry.java | 11 +- .../jstorm/translation/TxExecutorsBolt.java | 133 +++ .../translation/TxUnboundedSourceSpout.java | 156 .../translation/UnboundedSourceSpout.java | 189 + .../translation/UnboundedSourceTranslator.java | 44 +++ .../jstorm/translation/ViewExecutor.java| 56 +++ .../jstorm/translation/ViewTranslator.java | 378 ++ .../translation/WindowAssignExecutor.java | 112 ++ .../translation/WindowAssignTranslator.java | 41 ++ .../jstorm/translation/package-info.java| 22 ++ .../translation/runtime/AbstractComponent.java | 68 .../translation/runtime/DoFnExecutor.java | 343 - .../runtime/DoFnRunnerWithMetrics.java | 91 - .../jstorm/translation/runtime/Executor.java| 36 -- .../translation/runtime/ExecutorContext.java| 41 -- .../translation/runtime/ExecutorsBolt.java | 339 - .../translation/runtime/FlattenExecutor.java| 60 --- .../runtime/GroupByWindowExecutor.java | 177 - .../translation/runtime/MetricsReporter.java| 87 - .../runtime/MultiOutputDoFnExecutor.java| 79 .../runtime/MultiStatefulDoFnExecutor.java | 72 .../runtime/StatefulDoFnExecutor.java | 70 .../translation/runtime/TimerService.java | 51 --- .../translation/runtime/TimerServiceImpl.java | 155 .../translation/runtime/TxExecutorsBolt.java| 133 --- .../runtime/TxUnboundedSourceSpout.java | 156 .../runtime/UnboundedSourceSpout.java | 191 -- .../translation/runtime/ViewExecutor.java | 56 --- .../runtime/WindowAssignExecutor.java | 112 -- .../runtime/state/JStormBagState.java | 180 - .../runtime/state/JStormCombiningState.java | 88 - .../runtime/state/JStormMapState.java | 158 .../runtime/state/JStormStateInternals.ja
[14/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java index d907fac..6d6f1c6 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java @@ -17,15 +17,17 @@ */ package org.apache.beam.runners.jstorm.translation; -import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import java.util.List; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; +import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator; +import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.View; @@ -34,144 +36,151 @@ import org.apache.beam.sdk.values.PValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator; - -import java.util.List; - /** * Pipleline translator of Storm */ public class StormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { -private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class); -private TranslationContext context; -private int depth = 0; - -public StormPipelineTranslator(TranslationContext context) { -this.context = context; -} - -public void translate(Pipeline pipeline) { -List transformOverrides = -ImmutableList.builder() - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class), -new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class), -new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class), -new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class), -new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class), -new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class))) - .add(PTransformOverride.of(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), - new ReflectiveOneToOneOverrideFactory((ViewTranslator.CombineGloballyAsSingletonView.class -.build(); -pipeline.replaceAll(transformOverrides); -pipeline.traverseTopologically(this); -} - -@Override -public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { -LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node); -this.depth++; - -// check if current composite transforms need to be translated. -// If not, all sub transforms will be translated in visitPrimitiveTransform. -PTransform transform = node.getTransform(); -if (transform != null) { -TransformTranslator translator = TranslatorRegistry.getTranslator(transform); - -if (translator != null && applyCanTranslate(transform, node, translator)) { -applyStreamingTransform(transform, node, translator); -LOG.info(genSpaces(this.depth) + "translated-" + node); -return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; -} -} -return CompositeBehavior.ENTER_TRANSFORM; -} - -public void leaveCompositeTransform(TransformHierarchy.Node node) { -this.depth--; -
[11/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java index c487578..77e4381 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java @@ -18,94 +18,101 @@ package org.apache.beam.runners.jstorm.translation.translator; import avro.shaded.com.google.common.collect.Maps; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor; import org.apache.beam.runners.jstorm.translation.runtime.MultiOutputDoFnExecutor; import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.*; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.PValueBase; +import org.apache.beam.sdk.values.TupleTag; /** * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}. */ public class ParDoBoundMultiTranslator-extends TransformTranslator.Default > { - -@Override -public void translateNode(ParDo.MultiOutput transform, TranslationContext context) { -final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); -final TupleTag inputTag = (TupleTag) userGraphContext.getInputTag(); -PCollection input = (PCollection) userGraphContext.getInput(); +extends TransformTranslator.Default > { -Map allOutputs = Maps.newHashMap(userGraphContext.getOutputs()); -Map localToExternalTupleTagMap = Maps.newHashMap(); -for (Map.Entry entry : allOutputs.entrySet()) { -Iterator itr = ((PValueBase) entry.getValue()).expand().keySet().iterator(); -localToExternalTupleTagMap.put(entry.getKey(), itr.next()); -} + @Override + public void translateNode( + ParDo.MultiOutput transform, TranslationContext context) { +final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); +final TupleTag inputTag = (TupleTag) userGraphContext.getInputTag(); +PCollection input = (PCollection) userGraphContext.getInput(); -TupleTag mainOutputTag = (TupleTag) userGraphContext.getOutputTag(); -List sideOutputTags = userGraphContext.getOutputTags(); -sideOutputTags.remove(mainOutputTag); +Map allOutputs = Maps.newHashMap(userGraphContext.getOutputs()); +Map localToExternalTupleTagMap = Maps.newHashMap(); +for (Map.Entry entry : allOutputs.entrySet()) { + Iterator itr = ((PValueBase) entry.getValue()).expand().keySet().iterator(); + localToExternalTupleTagMap.put(entry.getKey(), itr.next()); +} -Map allInputs = Maps.newHashMap(userGraphContext.getInputs()); -for (PCollectionView pCollectionView : transform.getSideInputs()) { -allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); -} -String description = describeTransform( -transform, -allInputs, -allOutputs); +TupleTag mainOutputTag = (TupleTag) userGraphContext.getOutputTag(); +List sideOutputTags = userGraphContext.getOutputTags(); +sideOutputTags.remove(mainOutputTag); -ImmutableMap.Builder sideInputTagToView = ImmutableMap.builder(); -for
[29/53] [abbrv] beam git commit: jstorm-runner: update test runner of integration test in POM
jstorm-runner: update test runner of integration test in POM Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df154de2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df154de2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df154de2 Branch: refs/heads/jstorm-runner Commit: df154de20a216de4b997e0821dcf64cd553965ac Parents: 9abbbd0 Author: basti.lj <basti...@alibaba-inc.com> Authored: Fri Jul 14 16:53:08 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:58 2017 +0800 -- runners/jstorm/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/df154de2/runners/jstorm/pom.xml -- diff --git a/runners/jstorm/pom.xml b/runners/jstorm/pom.xml index 939f789..cdfaafb 100644 --- a/runners/jstorm/pom.xml +++ b/runners/jstorm/pom.xml @@ -73,7 +73,7 @@ [ -"--runner=com.alibaba.jstorm.beam.TestJStormRunner" + "--runner=org.apache.beam.runners.jstorm.TestJStormRunner" ]
[53/53] [abbrv] beam git commit: This closes #3734
This closes #3734 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e00e0e84 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e00e0e84 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e00e0e84 Branch: refs/heads/jstorm-runner Commit: e00e0e841e19d427377bd576f26b7fbf62c3b9fb Parents: 0a05de3 26bcdf3 Author: Pei He <p...@apache.org> Authored: Sun Aug 20 22:59:43 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sun Aug 20 22:59:43 2017 +0800 -- ...ostCommit_Java_ValidatesRunner_JStorm.groovy | 43 + runners/jstorm/pom.xml | 58 +- .../runners/jstorm/JStormPipelineOptions.java | 50 ++ .../beam/runners/jstorm/JStormRunner.java | 383 - .../runners/jstorm/JStormRunnerRegistrar.java | 41 +- .../beam/runners/jstorm/JStormRunnerResult.java | 78 +- .../beam/runners/jstorm/TestJStormRunner.java | 162 .../BeamSdkRepackUtilsSerializer.java | 287 +++ .../serialization/BeamUtilsSerializer.java | 114 +++ .../serialization/GuavaUtilsSerializer.java | 286 +++ .../serialization/JStormUtilsSerializer.java| 126 +++ .../serialization/JavaUtilsSerializer.java | 235 ++ .../jstorm/serialization/package-info.java | 22 + .../jstorm/translation/AbstractComponent.java | 67 ++ .../translation/BoundedSourceTranslator.java| 48 ++ .../jstorm/translation/CommonInstance.java | 28 + .../jstorm/translation/DefaultStepContext.java | 90 ++ .../jstorm/translation/DoFnExecutor.java| 348 .../translation/DoFnRunnerWithMetrics.java | 91 ++ .../runners/jstorm/translation/Executor.java| 42 + .../jstorm/translation/ExecutorContext.java | 41 + .../jstorm/translation/ExecutorsBolt.java | 366 .../jstorm/translation/FlattenExecutor.java | 67 ++ .../jstorm/translation/FlattenTranslator.java | 153 .../translation/GroupByKeyTranslator.java | 59 ++ .../translation/GroupByWindowExecutor.java | 204 + .../translation/JStormPipelineTranslator.java | 184 + .../translation/JStormStateInternals.java | 824 +++ .../translation/JStormTimerInternals.java | 96 +++ .../jstorm/translation/MetricsReporter.java | 86 ++ .../translation/MultiOutputDoFnExecutor.java| 55 ++ .../translation/MultiStatefulDoFnExecutor.java | 69 ++ .../translation/ParDoBoundMultiTranslator.java | 104 +++ .../translation/SerializedPipelineOptions.java | 65 ++ .../translation/StatefulDoFnExecutor.java | 67 ++ .../beam/runners/jstorm/translation/Stream.java | 104 +++ .../jstorm/translation/TimerService.java| 53 ++ .../jstorm/translation/TimerServiceImpl.java| 164 .../jstorm/translation/TransformTranslator.java | 85 ++ .../jstorm/translation/TranslationContext.java | 472 +++ .../jstorm/translation/TranslatorRegistry.java | 57 ++ .../jstorm/translation/TxExecutorsBolt.java | 133 +++ .../translation/TxUnboundedSourceSpout.java | 156 .../translation/UnboundedSourceSpout.java | 203 + .../translation/UnboundedSourceTranslator.java | 45 + .../jstorm/translation/ViewExecutor.java| 56 ++ .../jstorm/translation/ViewTranslator.java | 376 + .../translation/WindowAssignExecutor.java | 110 +++ .../translation/WindowAssignTranslator.java | 41 + .../jstorm/translation/package-info.java| 22 + .../jstorm/JStormRunnerRegistrarTest.java | 4 +- .../translation/JStormStateInternalsTest.java | 221 + runners/jstorm/src/test/resources/logback.xml | 42 + runners/pom.xml | 2 +- .../beam/sdk/transforms/ReshuffleTest.java | 22 +- sdks/pom.xml| 2 +- 56 files changed, 7351 insertions(+), 58 deletions(-) --
[40/53] [abbrv] beam git commit: jstorm-runner: VM crashs during ValidatesRunner tests disable reuse forked VM.
jstorm-runner: VM crashs during ValidatesRunner tests disable reuse forked VM. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d634ecf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d634ecf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d634ecf Branch: refs/heads/jstorm-runner Commit: 4d634ecf008e7fe6d1a99770da8bb66f7513b7e0 Parents: 588a698 Author: Pei He <p...@apache.org> Authored: Wed Jul 19 14:36:30 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:59 2017 +0800 -- runners/jstorm/pom.xml | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/4d634ecf/runners/jstorm/pom.xml -- diff --git a/runners/jstorm/pom.xml b/runners/jstorm/pom.xml index 5d54d94..79634e9 100644 --- a/runners/jstorm/pom.xml +++ b/runners/jstorm/pom.xml @@ -67,6 +67,7 @@ org.apache.beam.sdk.testing.UsesTestStream none + false true org.apache.beam:beam-sdks-java-core
[45/53] [abbrv] beam git commit: jstorm-runner: Fix duplicated update of metric counter
jstorm-runner: Fix duplicated update of metric counter Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/240f61bc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/240f61bc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/240f61bc Branch: refs/heads/jstorm-runner Commit: 240f61bc6baab9e698b2b6144688853a8371658f Parents: 61e9fa6 Author: basti.lj <basti...@alibaba-inc.com> Authored: Wed Aug 9 16:03:50 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:03:00 2017 +0800 -- .../org/apache/beam/runners/jstorm/translation/MetricsReporter.java | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/240f61bc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java index 82d8bdc..5b60b03 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java @@ -74,6 +74,7 @@ class MetricsReporter { AsmCounter counter = metricClient.registerCounter(metricName); Long incValue = (oldValue == null ? updateValue : updateValue - oldValue); counter.update(incValue); +reportedCounters.put(metricName, incValue); } } }
[31/53] [abbrv] beam git commit: jstorm-runner: upgrade to Beam version 2.1.0-SNAPSHOT.
jstorm-runner: upgrade to Beam version 2.1.0-SNAPSHOT. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5af6d2f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5af6d2f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5af6d2f Branch: refs/heads/jstorm-runner Commit: a5af6d2f420adebaf18f0a9e7367d392327b60e2 Parents: 30f3eda Author: Pei He <p...@apache.org> Authored: Tue Jul 18 14:29:19 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:58 2017 +0800 -- runners/jstorm/pom.xml | 5 - 1 file changed, 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/a5af6d2f/runners/jstorm/pom.xml -- diff --git a/runners/jstorm/pom.xml b/runners/jstorm/pom.xml index cdfaafb..9808cd2 100644 --- a/runners/jstorm/pom.xml +++ b/runners/jstorm/pom.xml @@ -32,7 +32,6 @@ 2.5.0-SNAPSHOT -2.0.0 @@ -99,7 +98,6 @@ org.apache.beam beam-sdks-java-core - ${beam.version} @@ -112,7 +110,6 @@ org.apache.beam beam-runners-core-java - ${beam.version} @@ -125,7 +122,6 @@ org.apache.beam beam-runners-core-construction-java - ${beam.version} @@ -150,7 +146,6 @@ org.apache.beam beam-sdks-java-core - ${beam.version} tests test
[30/53] [abbrv] beam git commit: jstorm-runner: support deleteTimer in JStormTimerInternals.
jstorm-runner: support deleteTimer in JStormTimerInternals. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/18198330 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/18198330 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/18198330 Branch: refs/heads/jstorm-runner Commit: 18198330d42a13d3d8dd96cccdbd07ba077b9408 Parents: af5221c Author: Pei He <p...@apache.org> Authored: Tue Jul 18 20:07:19 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:58 2017 +0800 -- .../runners/jstorm/translation/JStormTimerInternals.java| 3 +-- .../beam/runners/jstorm/translation/TimerService.java | 2 ++ .../beam/runners/jstorm/translation/TimerServiceImpl.java | 9 + 3 files changed, 12 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/18198330/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java index 4c96541..0e9ee35 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java @@ -69,8 +69,7 @@ class JStormTimerInternals implements TimerInternals { @Override @Deprecated public void deleteTimer(TimerData timerData) { -throw new UnsupportedOperationException( -"Canceling of a timer is not yet supported."); +timerService.deleteTimer(timerData); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/18198330/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java index 29345aa..24a9050 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java @@ -48,4 +48,6 @@ interface TimerService extends Serializable { void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor); void fireTimers(long newWatermark); + + void deleteTimer(TimerInternals.TimerData timerData); } http://git-wip-us.apache.org/repos/asf/beam/blob/18198330/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java index c2600e5..6b463db 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java @@ -152,4 +152,13 @@ class TimerServiceImpl implements TimerService { keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key)); timerDataToKeyedExecutors.put(timerData, keyedExecutors); } + + @Override + public void deleteTimer(TimerInternals.TimerData timerData) { +checkArgument( +TimeDomain.EVENT_TIME.equals(timerData.getDomain()), +String.format("Does not support domain: %s.", timerData.getDomain())); +eventTimeTimersQueue.remove(timerData); +timerDataToKeyedExecutors.remove(timerData); + } }
[01/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.
Repository: beam Updated Branches: refs/heads/jstorm-runner 0a05de365 -> e00e0e841 http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java -- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java new file mode 100644 index 000..548fb20 --- /dev/null +++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java @@ -0,0 +1,302 @@ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.runners.jstorm.StormPipelineOptions; +import org.apache.beam.runners.jstorm.TestJStormRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +@RunWith(JUnit4.class) +public class CoGroupByKeyTest implements Serializable { +/** + * Converts the given list into a PCollection belonging to the provided + * Pipeline in such a way that coder inference needs to be performed. + */ +private PCollection> createInput(String name, + Pipeline p, List > list) { +return createInput(name, p, list, new ArrayList()); +} + +/** + * Converts the given list with timestamps into a PCollection. + */ +private PCollection > createInput(String name, + Pipeline p, List > list, List timestamps) { +PCollection > input; +if (timestamps.isEmpty()) { +input = p.apply("Create" + name, Create.of(list) +.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of(; +} else { +input = p.apply("Create" + name, Create.timestamped(list, timestamps) +.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of(; +} +return input.apply( +"Identity" + name, +ParDo.of( +new DoFn , KV >() { +@ProcessElement +public void processElement(ProcessContext c) { +c.output(c.element()); +} +})); +} + +/** + * Returns a {@code PCollection >} containing the result + * of a {@link CoGroupByKey} over 2 {@code PCollection >}, + * where each {@link PCollection} has no duplicate keys and the key sets of + * each {@link PCollection} are intersecting but neither is a subset of the other. + */ +private PCollection > buildGetOnlyGbk( +Pipeline p, +TupleTag tag1, +TupleTag tag2) { +List > list1 = +Arrays.asList( +KV.of(1, "collection1-1"), +KV.of(2, "collection1-2")); +List > list2 = +Arrays.asList( +KV.of(2, "collection2-2"), +KV.of(3, "collection2-3")); +
[06/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java -- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java deleted file mode 100644 index 1870681..000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.translator; - -import avro.shaded.com.google.common.collect.Lists; -import avro.shaded.com.google.common.collect.Maps; -import com.alibaba.jstorm.beam.translation.TranslationContext; -import com.alibaba.jstorm.beam.translation.runtime.DoFnExecutor; -import com.alibaba.jstorm.beam.translation.runtime.MultiOutputDoFnExecutor; -import com.alibaba.jstorm.beam.translation.runtime.MultiStatefulDoFnExecutor; -import com.google.common.collect.ImmutableList; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableMap; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.*; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * Translates a ParDo.BoundMulti to a Storm {@link com.alibaba.jstorm.beam.translation.runtime.DoFnExecutor}. - */ -public class ParDoBoundMultiTranslator-extends TransformTranslator.Default > { - -@Override -public void translateNode(ParDo.MultiOutput transform, TranslationContext context) { -final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); -final TupleTag inputTag = (TupleTag) userGraphContext.getInputTag(); -PCollection input = (PCollection) userGraphContext.getInput(); - -Map allOutputs = Maps.newHashMap(userGraphContext.getOutputs()); -Map localToExternalTupleTagMap = Maps.newHashMap(); -for (Map.Entry entry : allOutputs.entrySet()) { -Iterator itr = ((PValueBase) entry.getValue()).expand().keySet().iterator(); -localToExternalTupleTagMap.put(entry.getKey(), itr.next()); -} - -TupleTag mainOutputTag = (TupleTag) userGraphContext.getOutputTag(); -List sideOutputTags = userGraphContext.getOutputTags(); -sideOutputTags.remove(mainOutputTag); - -Map allInputs = Maps.newHashMap(userGraphContext.getInputs()); -for (PCollectionView pCollectionView : transform.getSideInputs()) { -allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); -} -String description = describeTransform( -transform, -allInputs, -allOutputs); - -ImmutableMap.Builder sideInputTagToView = ImmutableMap.builder(); -for (PCollectionView pCollectionView : transform.getSideInputs()) { - sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView); -} - -DoFnExecutor executor; -DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); -if (signature.stateDeclarations().size() > 0 -|| signature.timerDeclarations().size() > 0) { -executor = new MultiStatefulDoFnExecutor<>( -userGraphContext.getStepName(), -description, -userGraphContext.getOptions(), -
[35/53] [abbrv] beam git commit: jstorm-runner: 1. Add kryo serializer for Collections.SingletonLists 2. Fix concurrent problem of elementIndex of JStormBagState
jstorm-runner: 1. Add kryo serializer for Collections.SingletonLists 2. Fix concurrent problem of elementIndex of JStormBagState Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/201ef722 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/201ef722 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/201ef722 Branch: refs/heads/jstorm-runner Commit: 201ef722ec36b0ffa8197722fdf898fb9978803c Parents: 1bf3224 Author: basti.lj <basti...@alibaba-inc.com> Authored: Wed Jul 19 20:15:56 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:59 2017 +0800 -- .../beam/runners/jstorm/JStormRunner.java | 2 + .../serialization/CollectionsSerializer.java| 43 +++ .../jstorm/translation/ExecutorsBolt.java | 2 +- .../translation/JStormStateInternals.java | 44 +--- 4 files changed, 75 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 286a975..56db1c6 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -31,6 +31,7 @@ import com.alibaba.jstorm.cluster.StormConfig; import com.alibaba.jstorm.transactional.TransactionTopologyBuilder; import java.util.HashMap; import java.util.Map; +import org.apache.beam.runners.jstorm.serialization.CollectionsSerializer; import org.apache.beam.runners.jstorm.serialization.ImmutableListSerializer; import org.apache.beam.runners.jstorm.serialization.ImmutableMapSerializer; import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer; @@ -105,6 +106,7 @@ public class JStormRunner extends PipelineRunner { SdkRepackImmuSetSerializer.registerSerializers(config); ImmutableMapSerializer.registerSerializers(config); SdkRepackImmutableMapSerializer.registerSerializers(config); +CollectionsSerializer.registerSerializers(config); config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class); return config; http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java new file mode 100644 index 000..0548196 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java @@ -0,0 +1,43 @@ +package org.apache.beam.runners.jstorm.serialization; + +import backtype.storm.Config; +import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; +import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; + +import java.util.Collections; +import java.util.List; + + +/** + * Specific serializer of {@link Kryo} for Collections. + */ +public class CollectionsSerializer { + + /** + * Specific {@link Kryo} serializer for {@link java.util.Collections.SingletonList}. + */ + public static class CollectionsSingletonListSerializer extends Serializer<List> { +public CollectionsSingletonListSerializer() { + setImmutable(true); +} + +@Override +public List read(final Kryo kryo, final Input input, final Class<List> type) { + final Object obj = kryo.readClassAndObject(input); + return Collections.singletonList(obj); +} + +@Override +public void write(final Kryo kryo, final Output output, final List list) { + kryo.writeClassAndObject(output, list.get(0)); +} + + } + + public static void registerSerializers(Config config) { +config.registerSerialization(Collections.singletonList("").getClass(), +CollectionsSingletonListSerializer.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/201ef722/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apac
[42/53] [abbrv] beam git commit: jstorm-runner: Support multiple copies of Flatten
jstorm-runner: Support multiple copies of Flatten Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1178f9fb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1178f9fb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1178f9fb Branch: refs/heads/jstorm-runner Commit: 1178f9fb957c7e6cf1b277696ff63dc0e29a6d5e Parents: 52913b7 Author: basti.lj <basti...@alibaba-inc.com> Authored: Thu Jul 20 20:04:24 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:03:00 2017 +0800 -- .../runners/jstorm/translation/FlattenExecutor.java | 12 ++-- .../jstorm/translation/FlattenTranslator.java| 15 +-- 2 files changed, 23 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/1178f9fb/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java index a64f494..928fa24 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.jstorm.translation; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.Map; + import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; @@ -32,8 +34,11 @@ class FlattenExecutor implements Executor { private TupleTag mainOutputTag; private ExecutorContext context; private ExecutorsBolt executorsBolt; + private final Map<TupleTag, Integer> tagToCopyNum; - public FlattenExecutor(String description, TupleTag mainTupleTag) { + public FlattenExecutor(String description, TupleTag mainTupleTag, + Map<TupleTag, Integer> tagToCopyNum) { +this.tagToCopyNum = checkNotNull(tagToCopyNum, "tagToCopyNum"); this.description = checkNotNull(description, "description"); this.mainOutputTag = mainTupleTag; } @@ -46,7 +51,10 @@ class FlattenExecutor implements Executor { @Override public void process(TupleTag tag, WindowedValue elem) { -executorsBolt.processExecutorElem(mainOutputTag, elem); +int copyNum = tagToCopyNum.get(tag); +for (int i = 0; i < copyNum; i++) { + executorsBolt.processExecutorElem(mainOutputTag, elem); +} } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1178f9fb/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java index e104ad8..b96bc56 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java @@ -48,9 +48,19 @@ class FlattenTranslator extends TransformTranslator.Default, PValue> inputs = Maps.newHashMap(); +Map<TupleTag, Integer> tagToCopyNum = Maps.newHashMap(); for (Map.Entry<TupleTag, PValue> entry : userGraphContext.getInputs().entrySet()) { PCollection pc = (PCollection) entry.getValue(); - inputs.putAll(pc.expand()); + //inputs.putAll(pc.expand()); + for (Map.Entry<TupleTag, PValue> entry1 : pc.expand().entrySet()) { +if (inputs.containsKey(entry1.getKey())) { + int copyNum = tagToCopyNum.get(entry1.getKey()); + tagToCopyNum.put(entry1.getKey(), ++copyNum); +} else { + inputs.put(entry1.getKey(), entry1.getValue()); + tagToCopyNum.put(entry1.getKey(), 1); +} + } } String description = describeTransform(transform, inputs, userGraphContext.getOutputs()); @@ -67,7 +77,8 @@ class FlattenTranslator extends TransformTranslator.Default
[41/53] [abbrv] beam git commit: jstorm-runner: Fixup for review comments
jstorm-runner: Fixup for review comments Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/90ed2ef3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/90ed2ef3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/90ed2ef3 Branch: refs/heads/jstorm-runner Commit: 90ed2ef344d19ca730429e9eb7c71779f995fc47 Parents: 6078cbc Author: basti.lj <basti...@alibaba-inc.com> Authored: Mon Aug 14 16:20:03 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:03:00 2017 +0800 -- .../runners/jstorm/JStormPipelineOptions.java | 12 +-- .../beam/runners/jstorm/JStormRunner.java | 21 ++-- .../beam/runners/jstorm/JStormRunnerResult.java | 21 ++-- .../beam/runners/jstorm/TestJStormRunner.java | 19 +++- .../serialization/JavaUtilsSerializer.java | 3 +- .../translation/BoundedSourceTranslator.java| 4 +- .../jstorm/translation/DoFnExecutor.java| 27 +++-- .../runners/jstorm/translation/Executor.java| 6 ++ .../jstorm/translation/ExecutorsBolt.java | 8 +- .../jstorm/translation/FlattenExecutor.java | 1 - .../jstorm/translation/FlattenTranslator.java | 23 ++-- .../translation/GroupByKeyTranslator.java | 12 --- .../translation/GroupByWindowExecutor.java | 12 --- .../translation/JStormStateInternals.java | 29 +++-- .../jstorm/translation/MetricsReporter.java | 2 - .../translation/MultiOutputDoFnExecutor.java| 22 +--- .../translation/MultiStatefulDoFnExecutor.java | 5 +- .../translation/ParDoBoundMultiTranslator.java | 14 +-- .../translation/ParDoBoundTranslator.java | 108 --- .../translation/StatefulDoFnExecutor.java | 1 - .../jstorm/translation/TimerService.java| 2 +- .../jstorm/translation/TimerServiceImpl.java| 8 +- .../jstorm/translation/TransformTranslator.java | 16 ++- .../jstorm/translation/TranslationContext.java | 18 +++- .../jstorm/translation/TranslatorRegistry.java | 1 - .../translation/UnboundedSourceSpout.java | 8 +- .../jstorm/translation/ViewTranslator.java | 4 +- .../translation/WindowAssignExecutor.java | 2 - .../translation/JStormStateInternalsTest.java | 2 +- 29 files changed, 141 insertions(+), 270 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java index 114877a..e494757 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java @@ -36,8 +36,8 @@ public interface JStormPipelineOptions extends PipelineOptions { @Description("Executing time(sec) of topology on local mode. Default is 1min.") @Default.Long(60) - Long getLocalModeExecuteTime(); - void setLocalModeExecuteTime(Long time); + Long getLocalModeExecuteTimeSec(); + void setLocalModeExecuteTimeSec(Long time); @Description("Worker number of topology") @Default.Integer(1) @@ -46,8 +46,8 @@ public interface JStormPipelineOptions extends PipelineOptions { @Description("Global parallelism number of a component") @Default.Integer(1) - Integer getParallelismNumber(); - void setParallelismNumber(Integer number); + Integer getParallelism(); + void setParallelism(Integer number); @Description("System topology config of JStorm") @Default.InstanceFactory(DefaultMapValueFactory.class) @@ -61,8 +61,8 @@ public interface JStormPipelineOptions extends PipelineOptions { @Description("Parallelism number of a specified composite PTransform") @Default.InstanceFactory(DefaultMapValueFactory.class) - Map getParallelismNumMap(); - void setParallelismNumMap(Map parallelismNumMap); + Map getParallelismMap(); + void setParallelismMap(Map parallelismNumMap); /** * Default value factory for topology configuration of JStorm. http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 47de42c..21a8fae 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/
[37/53] [abbrv] beam git commit: jstorm-runner: support Flatten with empty inputs.
jstorm-runner: support Flatten with empty inputs. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aca16cc9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aca16cc9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aca16cc9 Branch: refs/heads/jstorm-runner Commit: aca16cc9b2224b9bfce98719c6ef2abbad94f7df Parents: 4d634ec Author: Pei He <p...@apache.org> Authored: Wed Jul 19 15:34:56 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:59 2017 +0800 -- .../jstorm/translation/FlattenTranslator.java | 104 ++- 1 file changed, 100 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/aca16cc9/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java index 89708df..8f239bf 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java @@ -18,11 +18,24 @@ package org.apache.beam.runners.jstorm.translation; import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; /** * Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}. @@ -40,10 +53,93 @@ class FlattenTranslator extends TransformTranslator.Default pc = (PCollection) entry.getValue(); inputs.putAll(pc.expand()); } -System.out.println("Real inputs: " + inputs); -System.out.println("FlattenList inputs: " + userGraphContext.getInputs()); String description = describeTransform(transform, inputs, userGraphContext.getOutputs()); -FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag()); -context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs()); + +if (inputs.isEmpty()) { + // Create a empty source + TupleTag tag = userGraphContext.getOutputTag(); + PValue output = userGraphContext.getOutput(); + + UnboundedSourceSpout spout = new UnboundedSourceSpout( + description, + new EmptySource(), + userGraphContext.getOptions(), + tag); + context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output)); + +} else { + FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag()); + context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs()); +} + } + + private static class EmptySource extends UnboundedSource<Void, UnboundedSource.CheckpointMark> { +@Override +public List> split( +int i, PipelineOptions pipelineOptions) throws Exception { + return Collections.singletonList(this); +} + +@Override +public UnboundedReader createReader( +PipelineOptions pipelineOptions, +@Nullable CheckpointMark checkpointMark) throws IOException { + return new EmptyReader(); +} + +@Override +public Coder getCheckpointMarkCoder() { + return null; +} + +@Override +public void validate() { +} + +@Override +public Coder getDefaultOutputCoder() { + return VoidCoder.of(); +} + +private class EmptyReader extends UnboundedReader { + @Override + public boolean start() throws IOException { +return false; + } + + @Override + public boolean advance() throws IOException { +return false; + } + + @Override + public Void getCurrent() throws NoSuchElementException { +throw new NoSuchElementException(); + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { +throw new NoSuchElementException(); + } + + @Override +
[49/53] [abbrv] beam git commit: jstorm-runner: move jstorm ahead to get build result earlier.
jstorm-runner: move jstorm ahead to get build result earlier. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/26bcdf34 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/26bcdf34 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/26bcdf34 Branch: refs/heads/jstorm-runner Commit: 26bcdf3492feae15cae61aa5325b07c44ce2a310 Parents: 00b9c5c Author: Pei He <p...@apache.org> Authored: Fri Aug 18 17:26:53 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:03:01 2017 +0800 -- runners/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/26bcdf34/runners/pom.xml -- diff --git a/runners/pom.xml b/runners/pom.xml index 0cdac02..36b8f22 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -36,11 +36,11 @@ core-construction-java core-java direct-java +jstorm flink google-cloud-dataflow-java spark apex -jstorm
[36/53] [abbrv] beam git commit: jstorm-runner: Fix the failure of session window test cases in CombineTest
jstorm-runner: Fix the failure of session window test cases in CombineTest Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52913b7e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52913b7e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52913b7e Branch: refs/heads/jstorm-runner Commit: 52913b7e2b01b4e6c65d96a10d745dd3e6739c83 Parents: 201ef72 Author: basti.lj <basti...@alibaba-inc.com> Authored: Thu Jul 20 14:37:29 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:59 2017 +0800 -- .../jstorm/translation/FlattenTranslator.java | 1 - .../translation/JStormStateInternals.java | 188 +-- 2 files changed, 176 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/52913b7e/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java index 8f239bf..e104ad8 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/52913b7e/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java index 68a17e5..90ef6d2 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateBinder; import org.apache.beam.sdk.state.StateContext; import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.transforms.Combine; @@ -93,13 +94,14 @@ class JStormStateInternals implements StateInternals { } @Override - public T state(final StateNamespace namespace, StateTag address) { + public T state(final StateNamespace namespace, final StateTag address) { return address.getSpec().bind(address.getId(), new StateBinder() { @Override public ValueState bindValue(String id, StateSpec<ValueState> spec, Coder coder) { try { return new JStormValueState<>( - getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id))); + getStoreId(id), spec, getKey(), namespace, + kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id))); } catch (IOException e) { throw new RuntimeException(); } @@ -109,7 +111,8 @@ class JStormStateInternals implements StateInternals { public BagState bindBag(String id, StateSpec<BagState> spec, Coder elemCoder) { try { return new JStormBagState( - getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)), + getStoreId(id), spec, getKey(), namespace, + kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)), kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id))); } catch (IOException e) { throw new RuntimeException(); @@ -129,7 +132,8 @@ class JStormStateInternals implements StateInternals { Coder mapValueCoder) { try { return new JStormMapState<>( - getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id))); + getStoreId(id), spec, (KeyT) getKey(), namespace, + kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id))); } catch (IO
[43/53] [abbrv] beam git commit: ReshuffleTest: replace Iterable equal tests with matchers.
ReshuffleTest: replace Iterable equal tests with matchers. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e182cf75 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e182cf75 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e182cf75 Branch: refs/heads/jstorm-runner Commit: e182cf75eb3d5a4701a98a2f5687cf0ea9d51774 Parents: 1178f9f Author: Pei He <p...@apache.org> Authored: Wed Jul 19 20:38:49 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:03:00 2017 +0800 -- .../beam/sdk/transforms/ReshuffleTest.java | 22 ++-- 1 file changed, 16 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/e182cf75/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java index 3cd7cf9..0eb8e2d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java @@ -17,7 +17,10 @@ */ package org.apache.beam.sdk.transforms; +import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -70,9 +73,16 @@ public class ReshuffleTest implements Serializable { KV.of("k1", 3), KV.of("k2", 4)); - private static final List<KV<String, Iterable>> GROUPED_TESTABLE_KVS = ImmutableList.of( -KV.of("k1", (Iterable) ImmutableList.of(3)), -KV.of("k2", (Iterable) ImmutableList.of(4))); + private static class AssertThatHasExpectedContents + implements SerializableFunction<Iterable<KV<String, Iterable>>, Void> { +@Override +public Void apply(Iterable<KV<String, Iterable>> actual) { + assertThat(actual, containsInAnyOrder( + isKv(is("k1"), containsInAnyOrder(3)), + isKv(is("k2"), containsInAnyOrder(4; + return null; +} + } @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -167,7 +177,7 @@ public class ReshuffleTest implements Serializable { PCollection<KV<String, Iterable>> output = input .apply(Reshuffle.<String, Iterable>of()); -PAssert.that(output).containsInAnyOrder(GROUPED_TESTABLE_KVS); +PAssert.that(output).satisfies(new AssertThatHasExpectedContents()); assertEquals( input.getWindowingStrategy(), @@ -190,7 +200,7 @@ public class ReshuffleTest implements Serializable { PCollection<KV<String, Iterable>> output = input .apply(Reshuffle.<String, Iterable>of()); -PAssert.that(output).containsInAnyOrder(GROUPED_TESTABLE_KVS); +PAssert.that(output).satisfies(new AssertThatHasExpectedContents()); assertEquals( input.getWindowingStrategy(), @@ -213,7 +223,7 @@ public class ReshuffleTest implements Serializable { PCollection<KV<String, Iterable>> output = input .apply(Reshuffle.<String, Iterable>of()); -PAssert.that(output).containsInAnyOrder(GROUPED_TESTABLE_KVS); +PAssert.that(output).satisfies(new AssertThatHasExpectedContents()); assertEquals( input.getWindowingStrategy(),
[09/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.
jstorm-runner: rename the package to org.apache.beam.runners.jstorm. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aa654b3f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aa654b3f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aa654b3f Branch: refs/heads/jstorm-runner Commit: aa654b3f15a242221727d021cf4be676c49bd49b Parents: 6ff07fc Author: Pei He <p...@apache.org> Authored: Thu Jul 13 17:02:21 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 02:40:26 2017 +0800 -- .../jstorm/beam/StormPipelineOptions.java | 72 --- .../com/alibaba/jstorm/beam/StormRegistrar.java | 48 -- .../com/alibaba/jstorm/beam/StormRunner.java| 339 -- .../alibaba/jstorm/beam/TestJStormRunner.java | 122 .../serialization/ImmutableListSerializer.java | 92 --- .../serialization/ImmutableMapSerializer.java | 62 -- .../serialization/ImmutableSetSerializer.java | 72 --- .../KvStoreIterableSerializer.java | 55 -- .../SdkRepackImmuListSerializer.java| 78 --- .../SdkRepackImmuSetSerializer.java | 72 --- .../UnmodifiableCollectionsSerializer.java | 159 - .../translation/StormPipelineTranslator.java| 181 -- .../beam/translation/TranslationContext.java| 425 - .../beam/translation/TranslatorRegistry.java| 76 --- .../translation/runtime/AbstractComponent.java | 71 --- .../translation/runtime/AdaptorBasicBolt.java | 24 - .../translation/runtime/AdaptorBasicSpout.java | 24 - .../beam/translation/runtime/DoFnExecutor.java | 330 -- .../runtime/DoFnRunnerWithMetrics.java | 90 --- .../beam/translation/runtime/Executor.java | 37 -- .../translation/runtime/ExecutorContext.java| 35 -- .../beam/translation/runtime/ExecutorsBolt.java | 332 -- .../translation/runtime/FlattenExecutor.java| 55 -- .../runtime/GroupByWindowExecutor.java | 160 - .../translation/runtime/MetricsReporter.java| 93 --- .../runtime/MultiOutputDoFnExecutor.java| 75 --- .../runtime/MultiStatefulDoFnExecutor.java | 68 -- .../runtime/StatefulDoFnExecutor.java | 67 -- .../beam/translation/runtime/TimerService.java | 52 -- .../translation/runtime/TimerServiceImpl.java | 150 - .../translation/runtime/TxExecutorsBolt.java| 131 .../runtime/TxUnboundedSourceSpout.java | 153 - .../runtime/UnboundedSourceSpout.java | 198 -- .../beam/translation/runtime/ViewExecutor.java | 55 -- .../runtime/WindowAssignExecutor.java | 108 .../runtime/state/JStormBagState.java | 178 -- .../runtime/state/JStormCombiningState.java | 88 --- .../runtime/state/JStormMapState.java | 155 - .../runtime/state/JStormStateInternals.java | 192 -- .../runtime/state/JStormValueState.java | 84 --- .../runtime/state/JStormWatermarkHoldState.java | 83 --- .../runtime/timer/JStormTimerInternals.java | 99 --- .../translator/BoundedSourceTranslator.java | 50 -- .../translator/CombineGloballyTranslator.java | 24 - .../translator/CombinePerKeyTranslator.java | 24 - .../translator/FlattenTranslator.java | 49 -- .../translator/GroupByKeyTranslator.java| 69 -- .../translator/ParDoBoundMultiTranslator.java | 114 .../translator/ParDoBoundTranslator.java| 106 .../translator/ReshuffleTranslator.java | 24 - .../beam/translation/translator/Stream.java | 91 --- .../translator/TransformTranslator.java | 77 --- .../translator/UnboundedSourceTranslator.java | 46 -- .../translation/translator/ViewTranslator.java | 374 --- .../translator/WindowAssignTranslator.java | 38 -- .../translator/WindowBoundTranslator.java | 48 -- .../beam/translation/util/CommonInstance.java | 25 - .../util/DefaultSideInputReader.java| 46 -- .../translation/util/DefaultStepContext.java| 89 --- .../alibaba/jstorm/beam/util/RunnerUtils.java | 53 -- .../beam/util/SerializedPipelineOptions.java| 64 -- .../beam/util/SingletonKeyedWorkItem.java | 62 -- .../runners/jstorm/StormPipelineOptions.java| 72 +++ .../beam/runners/jstorm/StormRegistrar.java | 48 ++ .../apache/beam/runners/jstorm/StormRunner.java | 345 ++ .../beam/runners/jstorm/TestJStormRunner.java | 120 .../serialization/ImmutableListSerializer.java | 92 +++ .../serialization/ImmutableMapSerializer.java | 61 ++ .../serialization/ImmutableSetSerializer.java | 71 +++ .../KvStoreIterableSerializer.java | 55 ++ .../SdkRepackImmuListSerializer.java| 78 +++ .../SdkRepackImmuSetSerializer.java | 71 +++ .../UnmodifiableCollectionsSerializer.jav
[08/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java -- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java deleted file mode 100644 index c3e9805..000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java +++ /dev/null @@ -1,425 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation; - -import avro.shaded.com.google.common.collect.Lists; -import com.alibaba.jstorm.beam.translation.translator.Stream; -import com.alibaba.jstorm.beam.util.RunnerUtils; -import com.google.common.base.Strings; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.PValueBase; -import org.apache.beam.sdk.values.TaggedPValue; -import org.apache.beam.sdk.values.TupleTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.jstorm.beam.StormPipelineOptions; -import com.alibaba.jstorm.beam.translation.runtime.AdaptorBasicSpout; -import com.alibaba.jstorm.beam.translation.runtime.Executor; -import com.alibaba.jstorm.beam.translation.runtime.ExecutorsBolt; -import com.alibaba.jstorm.beam.translation.util.CommonInstance; - -import java.util.*; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -/** - * Maintains the state necessary during Pipeline translation to build a Storm topology. - */ -public class TranslationContext { -private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class); - -private final UserGraphContext userGraphContext; -private final ExecutionGraphContext executionGraphContext; - -public TranslationContext(StormPipelineOptions options) { -this.userGraphContext = new UserGraphContext(options); -this.executionGraphContext = new ExecutionGraphContext(); -} - -public ExecutionGraphContext getExecutionGraphContext() { -return executionGraphContext; -} - -public UserGraphContext getUserGraphContext() { -return userGraphContext; -} - -private void addStormStreamDef(TaggedPValue input, String destComponentName, Stream.Grouping grouping) { -Stream.Producer producer = executionGraphContext.getProducer(input.getValue()); -if (!producer.getComponentId().equals(destComponentName)) { -Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping); -executionGraphContext.registerStreamConsumer(consumer, producer); - -ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId()); -if (executorsBolt != null) { -executorsBolt.addExternalOutputTag(input.getTag()); -} -} -} - -private String getUpstreamExecutorsBolt() { -for (PValue value : userGraphContext.getInputs().values()) { -String componentId = executionGraphContext.getProducerComponentId(value); -if (componentId != null && executionGraphContext.getBolt(componentId) != null) { -return componentId; -} -} -// When upstream component is spout, "null" will be return. -return null; -} - -/** - * check if the current transform is applied to source collection. - * @return - */ -private boolean connectedToSource() { -for (PValue value : userGraphContext.getInputs().values()) { -if (executionGraphContext.producedBySpout(value)) { -return true; -} -} -return false; -} - -/** - *
[46/53] [abbrv] beam git commit: jstorm-runner: 1. Generate execution DAG for runtime 2. Restructure Kryo serializers
http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java deleted file mode 100644 index 615ac8b..000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.serialization; - -import backtype.storm.Config; -import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; -import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; -import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; - -/** - * Specific serializer of {@link Kryo} for Unmodifiable Collection. - */ -public class UnmodifiableCollectionsSerializer extends Serializer { - - private static final Field SOURCE_COLLECTION_FIELD; - private static final Field SOURCE_MAP_FIELD; - - static { -try { - SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection") - .getDeclaredField("c"); - SOURCE_COLLECTION_FIELD.setAccessible(true); - - - SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap") - .getDeclaredField("m"); - SOURCE_MAP_FIELD.setAccessible(true); -} catch (final Exception e) { - throw new RuntimeException("Could not access source collection" - + " field in java.util.Collections$UnmodifiableCollection.", e); -} - } - - @Override - public Object read(final Kryo kryo, final Input input, final Class clazz) { -final int ordinal = input.readInt(true); -final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal]; -final Object sourceCollection = kryo.readClassAndObject(input); -return unmodifiableCollection.create(sourceCollection); - } - - @Override - public void write(final Kryo kryo, final Output output, final Object object) { -try { - final UnmodifiableCollection unmodifiableCollection = - UnmodifiableCollection.valueOfType(object.getClass()); - // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id") - output.writeInt(unmodifiableCollection.ordinal(), true); - kryo.writeClassAndObject(output, unmodifiableCollection.sourceCollectionField.get(object)); -} catch (final RuntimeException e) { - // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... - // handles SerializationException specifically (resizing the buffer)... - throw e; -} catch (final Exception e) { - throw new RuntimeException(e); -} - } - - @Override - public Object copy(Kryo kryo, Object original) { -try { - final UnmodifiableCollection unmodifiableCollection = - UnmodifiableCollection.valueOfType(original.getClass()); - Object sourceCollectionCopy = - kryo.copy(unmodifiableCollection.sourceCollectionField.get(original)); - return unmodifiableCollection.create(sourceCollectionCopy); -} catch (final RuntimeException e) { - // Don't eat and wrap RuntimeExceptions - throw e; -} catch (final Exception e) { - throw new RuntimeException(e); -} - } - - private enum UnmodifiableCollection { -COLLECTION( -Collections.unmodifiableCollection(Arrays.asList("")).getClass(), -SOURCE_COLLECTION_FIELD) { -
[17/53] [abbrv] beam git commit: jstorm-runner: remove ValidatesRunner tests and dead code from jstorm module.
jstorm-runner: remove ValidatesRunner tests and dead code from jstorm module. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ff42cbc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ff42cbc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ff42cbc Branch: refs/heads/jstorm-runner Commit: 4ff42cbc65452ae6259d90f07f2f80423eeb69df Parents: aa251a4 Author: Pei He <p...@apache.org> Authored: Thu Jul 13 18:38:49 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:56 2017 +0800 -- .../jstorm/translation/TranslatorRegistry.java | 18 - .../translator/CombineGloballyTranslator.java | 25 - .../translator/CombinePerKeyTranslator.java | 25 - .../translator/ReshuffleTranslator.java | 24 - .../translator/WindowBoundTranslator.java | 47 -- .../util/DefaultSideInputReader.java| 45 -- .../translator/CoGroupByKeyTest.java| 301 - .../translation/translator/GroupByKeyTest.java | 155 - .../translation/translator/ParDoTest.java | 624 --- 9 files changed, 1264 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java index bce5b3e..316186e 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java @@ -49,30 +49,12 @@ public class TranslatorRegistry { static { TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator()); TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator()); -// TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator()); -// TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); - TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator()); TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator()); - -//TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>()); TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>()); - TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator()); - TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); - TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator()); - -/** - * Currently, empty translation is required for combine and reshuffle. - * Because, the transforms will be mapped to GroupByKey and Pardo finally. - * So we only need to translator the finally transforms. - * If any improvement is required, the composite transforms will be translated in the future. - */ -// TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); -// TRANSLATORS.put(Globally.class, new CombineGloballyTranslator()); -// TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator()); } public static TransformTranslator getTranslator(PTransform transform) { http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java deleted file mode 100644 index fe5fca9..000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, ei
[32/53] [abbrv] beam git commit: jstorm-runner: disable validates runner tests with TestStream and Metrics.
jstorm-runner: disable validates runner tests with TestStream and Metrics. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc6f63ca Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc6f63ca Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc6f63ca Branch: refs/heads/jstorm-runner Commit: dc6f63cafa9c91535f48aa483d316bb9a0d12a41 Parents: a5af6d2 Author: Pei He <p...@apache.org> Authored: Tue Jul 18 14:48:25 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:58 2017 +0800 -- runners/jstorm/pom.xml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/dc6f63ca/runners/jstorm/pom.xml -- diff --git a/runners/jstorm/pom.xml b/runners/jstorm/pom.xml index 9808cd2..5d54d94 100644 --- a/runners/jstorm/pom.xml +++ b/runners/jstorm/pom.xml @@ -61,8 +61,10 @@ org.apache.beam.sdk.testing.UsesSetState, +org.apache.beam.sdk.testing.UsesSplittableParDo, +org.apache.beam.sdk.testing.UsesAttemptedMetrics, org.apache.beam.sdk.testing.UsesCommittedMetrics, -org.apache.beam.sdk.testing.UsesSplittableParDo +org.apache.beam.sdk.testing.UsesTestStream none true
[28/53] [abbrv] beam git commit: jstorm-runner: 1. Use the TupleTag of "PCollection expand" when getting input tags and output tags 2. Check the exception record when asserting of unit test
jstorm-runner: 1. Use the TupleTag of "PCollection expand" when getting input tags and output tags 2. Check the exception record when asserting of unit test Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30f3eda6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30f3eda6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30f3eda6 Branch: refs/heads/jstorm-runner Commit: 30f3eda64c68cea092c42b7acc1dfd98eb8cbbd0 Parents: df154de Author: basti.lj <basti...@alibaba-inc.com> Authored: Mon Jul 17 15:55:01 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:58 2017 +0800 -- .../beam/runners/jstorm/TestJStormRunner.java | 9 + .../runners/jstorm/translation/DoFnExecutor.java| 6 +- .../jstorm/translation/MultiOutputDoFnExecutor.java | 6 +- .../jstorm/translation/ParDoBoundTranslator.java| 5 +++-- .../jstorm/translation/TranslationContext.java | 16 +--- 5 files changed, 27 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index b1b0379..a117675 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -8,6 +8,7 @@ import com.alibaba.jstorm.metric.AsmWindow; import com.alibaba.jstorm.metric.JStormMetrics; import com.alibaba.jstorm.metric.MetaType; import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.task.error.TaskReportErrorAndDie; import com.alibaba.jstorm.utils.JStormUtils; import com.google.common.base.Optional; import com.google.common.collect.Maps; @@ -56,14 +57,21 @@ public class TestJStormRunner extends PipelineRunner { if (numberOfAssertions == 0) { // If assert number is zero, wait 5 sec JStormUtils.sleepMs(5000); +Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); +if (taskExceptionRec != null) { + throw new RuntimeException(taskExceptionRec.getCause()); +} return result; } else { for (int i = 0; i < 40; ++i) { Optional success = checkForPAssertSuccess(numberOfAssertions); + Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); if (success.isPresent() && success.get()) { return result; } else if (success.isPresent() && !success.get()) { throw new AssertionError("Failed assertion checks."); + } else if (taskExceptionRec != null) { +throw new RuntimeException(taskExceptionRec.getCause()); } else { JStormUtils.sleepMs(500); } @@ -74,6 +82,7 @@ public class TestJStormRunner extends PipelineRunner { } finally { clearPAssertCount(); cancel(result); + TaskReportErrorAndDie.setExceptionRecord(null); } } http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java index fdd9af6..6baa944 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java @@ -203,8 +203,12 @@ class DoFnExecutor<InputT, OutputT> implements Executor { tag, mainInputTag, sideInputs, elem.getValue())); if (mainInputTag.equals(tag)) { processMainInput(elem); -} else { +} else if (sideInputTagToView.containsKey(tag)) { processSideInput(tag, elem); +} else { + LOG.warn("Discard unexpected elem={} from tag={}", elem.getValue(), tag); + LOG.warn("Current mainInputTag={}, sideInputTags={}", + mainInputTag, sideInputTagToView.keySet()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/30f3eda6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java -- diff --git a/runners/jstorm/src/main/java/org/ap
[12/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java index 2d80617..7f98c61 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java @@ -17,177 +17,175 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; +import static com.google.common.base.Preconditions.checkNotNull; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Values; +import com.alibaba.jstorm.utils.KryoSerializer; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.jstorm.JStormPipelineOptions; import org.apache.beam.runners.jstorm.translation.util.CommonInstance; -import com.alibaba.jstorm.utils.KryoSerializer; +import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; - -import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Values; - import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import static com.google.common.base.Preconditions.checkNotNull; - /** * Spout implementation that wraps a Beam UnboundedSource - * + * * TODO: add wrapper to support metrics in UnboundedSource. */ public class UnboundedSourceSpout extends AdaptorBasicSpout { -private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); - -private final String description; -private final UnboundedSource source; -private final SerializedPipelineOptions serializedOptions; -private final TupleTag outputTag; - -private transient JStormPipelineOptions pipelineOptions; -private transient UnboundedSource.UnboundedReader reader; -private transient SpoutOutputCollector collector; - -private volatile boolean hasNextRecord; -private AtomicBoolean activated = new AtomicBoolean(); - -private KryoSerializer serializer; - -private long lastWaterMark = 0l; - -public UnboundedSourceSpout( -String description, -UnboundedSource source, -JStormPipelineOptions options, -TupleTag outputTag) { -this.description = checkNotNull(description, "description"); -this.source = checkNotNull(source, "source"); -this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options")); -this.outputTag = checkNotNull(outputTag, "outputTag"); -} - -@Override -public synchronized void close() { -try { -activated.set(false); -this.reader.close(); -} catch (IOException e) { -e.printStackTrace(); -} -} - -@Override -public void activate() { -activated.set(true); - -} - -@Override -public void deactivate() { -activated.set(false); -} - -@Override -public void ack(Object msgId) { -throw new UnsupportedOperationException(); + private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); + + private final String description; + private final UnboundedSource source; + private final SerializedPipelineOptions serializedOptions; + private final TupleTag outputTag; + + private transient JStormPipelineOptions pipelineOptions; + private transient UnboundedSource.UnboundedReader reader; + private transient SpoutOutputCollector collector; + + private volatile boolean hasNextRecord; + private AtomicBoolean activated = new AtomicBoolean(); + + private KryoSerializer serializer; + + private long lastWaterMark = 0l; + + public UnboundedSourceSpout( + String description, + UnboundedSource source, + JStormPipelineOptions options, + TupleTag outputTag) { +this.description = checkNotNull(description, "description"); +this.source = checkNotNull(source, "source"); +this.serializedOptions = new
[13/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java index 9df1e17..e80fb48 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java @@ -17,13 +17,15 @@ */ package org.apache.beam.runners.jstorm.translation.runtime; -import java.io.IOException; -import java.util.*; +import static com.google.common.base.Preconditions.checkNotNull; import avro.shaded.com.google.common.base.Joiner; import avro.shaded.com.google.common.collect.Sets; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; import backtype.storm.tuple.ITupleExt; -import org.apache.beam.runners.jstorm.translation.util.CommonInstance; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; import com.alibaba.jstorm.cache.IKvStoreManager; import com.alibaba.jstorm.cache.KvStoreManagerFactory; import com.alibaba.jstorm.cluster.Common; @@ -31,6 +33,14 @@ import com.alibaba.jstorm.utils.KryoSerializer; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.runners.jstorm.translation.util.CommonInstance; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -39,289 +49,287 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import static com.google.common.base.Preconditions.checkNotNull; - public class ExecutorsBolt extends AdaptorBasicBolt { -private static final long serialVersionUID = -7751043327801735211L; + private static final long serialVersionUID = -7751043327801735211L; -private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class); + private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class); -protected ExecutorContext executorContext; + protected ExecutorContext executorContext; -protected TimerService timerService; + protected TimerService timerService; -// map from input tag to executor inside bolt -protected final MapinputTagToExecutor = Maps.newHashMap(); -// set of all output tags that will be emit outside bolt -protected final Set outputTags = Sets.newHashSet(); -protected final Set externalOutputTags = Sets.newHashSet(); -protected final Set doFnExecutors = Sets.newHashSet(); -protected int internalDoFnExecutorId = 1; -protected final Map idToDoFnExecutor = Maps.newHashMap(); + // map from input tag to executor inside bolt + protected final Map inputTagToExecutor = Maps.newHashMap(); + // set of all output tags that will be emit outside bolt + protected final Set outputTags = Sets.newHashSet(); + protected final Set externalOutputTags = Sets.newHashSet(); + protected final Set doFnExecutors = Sets.newHashSet(); + protected int internalDoFnExecutorId = 1; + protected final Map idToDoFnExecutor = Maps.newHashMap(); -protected OutputCollector collector; + protected OutputCollector collector; -protected boolean isStatefulBolt = false; + protected boolean isStatefulBolt = false; -protected KryoSerializer serializer; + protected KryoSerializer serializer; -public ExecutorsBolt() { + public ExecutorsBolt() { -} - -public void setStatefulBolt(boolean isStateful) { -isStatefulBolt = isStateful; -} - -public void addExecutor(TupleTag inputTag, Executor executor) { -inputTagToExecutor.put( -checkNotNull(inputTag, "inputTag"), -checkNotNull(executor, "executor")); -} - -public Map getExecutors() { -return inputTagToExecutor; -} - -public void registerExecutor(Executor executor) { -if (executor instanceof DoFnExecutor) { -DoFnExecutor doFnExecutor = (DoFnExecutor) executor; -idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor); -
[07/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/StatefulDoFnExecutor.java -- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/StatefulDoFnExecutor.java deleted file mode 100644 index 889977b..000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/StatefulDoFnExecutor.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.beam.translation.runtime; - -import com.alibaba.jstorm.beam.StormPipelineOptions; -import com.alibaba.jstorm.beam.translation.runtime.state.JStormStateInternals; -import com.alibaba.jstorm.beam.translation.runtime.timer.JStormTimerInternals; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -public class StatefulDoFnExecutor extends DoFnExecutor{ -public StatefulDoFnExecutor( -String stepName, String description, StormPipelineOptions pipelineOptions, -DoFn doFn, Coder inputCoder, -WindowingStrategy windowingStrategy, TupleTag mainInputTag, -Collection sideInputs, Map -sideInputTagToView, TupleTag mainTupleTag, List sideOutputTags) { -super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, -mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); -} - -@Override -public void process(TupleTag tag, WindowedValue elem) { -if (mainInputTag.equals(tag)) { -WindowedValue kvElem = (WindowedValue) elem; -stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this, -executorContext.getExecutorsBolt().timerService())); -stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(), -kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); -processMainInput(elem); -} else { -processSideInput(tag, elem); -} -} - -@Override -public void onTimer(Object key, TimerInternals.TimerData timerData) { -stepContext.setStateInternals(new JStormStateInternals<>(key, -kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); -super.onTimer(key, timerData); -} -} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TimerService.java -- diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TimerService.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TimerService.java deleted file mode 100644 index 60d2f1a..000 --- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TimerService.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed
[23/53] [abbrv] beam git commit: jstorm-runner: move most classes to translation package and reduece their visibility to package private.
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java new file mode 100644 index 000..dab9518 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.tuple.Values; +import com.alibaba.jstorm.utils.KryoSerializer; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.runners.jstorm.JStormPipelineOptions; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Spout implementation that wraps a Beam UnboundedSource. + * TODO: add wrapper to support metrics in UnboundedSource. + */ +public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); + + private final String description; + private final UnboundedSource source; + private final SerializedPipelineOptions serializedOptions; + private final TupleTag outputTag; + + private transient JStormPipelineOptions pipelineOptions; + private transient UnboundedSource.UnboundedReader reader; + private transient SpoutOutputCollector collector; + + private volatile boolean hasNextRecord; + private AtomicBoolean activated = new AtomicBoolean(); + + private KryoSerializer serializer; + + private long lastWaterMark = 0L; + + public UnboundedSourceSpout( + String description, + UnboundedSource source, + JStormPipelineOptions options, + TupleTag outputTag) { +this.description = checkNotNull(description, "description"); +this.source = checkNotNull(source, "source"); +this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options")); +this.outputTag = checkNotNull(outputTag, "outputTag"); + } + + @Override + public synchronized void close() { +try { + activated.set(false); + this.reader.close(); +} catch (IOException e) { + e.printStackTrace(); +} + } + + @Override + public void activate() { +activated.set(true); + + } + + @Override + public void deactivate() { +activated.set(false); + } + + @Override + public void ack(Object msgId) { +throw new UnsupportedOperationException(); + } + + @Override + public void fail(Object msgId) { +throw new UnsupportedOperationException(); + } + + @Override + public MapgetComponentConfiguration() { +return null; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { +try { + this.collector = collector; + this.pipelineOptions = + this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); + + createSourceReader(null); + + this.serializer = new KryoSerializer<>(conf); +} catch (IOException e) { + throw new RuntimeException("Unable to create unbounded reader.", e); +} + } + + public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException { +if (reader != null) { + reader.close(); +} +reader = this.source.createReader(this.pipelineOptions,
[34/53] [abbrv] beam git commit: jstorm-runner: update jstorm runner pacakge name in logback config.
jstorm-runner: update jstorm runner pacakge name in logback config. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bf32247 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bf32247 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bf32247 Branch: refs/heads/jstorm-runner Commit: 1bf3224708b6b2a9c195eddf65a7bfb279d6806d Parents: aca16cc Author: Pei He <p...@apache.org> Authored: Wed Jul 19 16:14:14 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:59 2017 +0800 -- runners/jstorm/src/test/resources/logback.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/1bf32247/runners/jstorm/src/test/resources/logback.xml -- diff --git a/runners/jstorm/src/test/resources/logback.xml b/runners/jstorm/src/test/resources/logback.xml index be3bc8f..635933c 100644 --- a/runners/jstorm/src/test/resources/logback.xml +++ b/runners/jstorm/src/test/resources/logback.xml @@ -14,7 +14,7 @@ - +
[19/53] [abbrv] beam git commit: jstorm-runner: remove AdaptorBasicBolt and AdaptorBasicSpout.
jstorm-runner: remove AdaptorBasicBolt and AdaptorBasicSpout. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9309ac49 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9309ac49 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9309ac49 Branch: refs/heads/jstorm-runner Commit: 9309ac49d81e1d6dfd694ec885cdb12a3db53483 Parents: 5a15d54 Author: Pei He <p...@apache.org> Authored: Fri Jul 14 14:50:47 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:57 2017 +0800 -- .../beam/runners/jstorm/JStormRunner.java | 16 +++- .../jstorm/translation/TranslationContext.java | 12 - .../translation/runtime/AdaptorBasicBolt.java | 27 .../translation/runtime/AdaptorBasicSpout.java | 27 .../translation/runtime/ExecutorsBolt.java | 3 ++- .../runtime/UnboundedSourceSpout.java | 3 ++- 6 files changed, 14 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 00ec7f6..8782130 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -41,8 +41,6 @@ import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSeria import org.apache.beam.runners.jstorm.translation.JStormPipelineTranslator; import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.runners.jstorm.translation.runtime.AbstractComponent; -import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicBolt; -import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout; import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt; import org.apache.beam.runners.jstorm.translation.runtime.TxExecutorsBolt; import org.apache.beam.runners.jstorm.translation.runtime.TxUnboundedSourceSpout; @@ -155,18 +153,12 @@ public class JStormRunner extends PipelineRunner { private AbstractComponent getComponent( String id, TranslationContext.ExecutionGraphContext context) { -AbstractComponent component = null; -AdaptorBasicSpout spout = context.getSpout(id); +AbstractComponent spout = context.getSpout(id); if (spout != null) { - component = spout; + return spout; } else { - AdaptorBasicBolt bolt = context.getBolt(id); - if (bolt != null) { -component = bolt; - } + return context.getBolt(id); } - -return component; } private StormTopology getTopology( @@ -176,7 +168,7 @@ public class JStormRunner extends PipelineRunner { isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder(); int parallelismNumber = options.getParallelismNumber(); -Map<String, AdaptorBasicSpout> spouts = context.getSpouts(); +Map<String, UnboundedSourceSpout> spouts = context.getSpouts(); for (String id : spouts.keySet()) { IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id)); builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber)); http://git-wip-us.apache.org/repos/asf/beam/blob/9309ac49/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java index 1230a31..28d102d 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java @@ -34,9 +34,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout; import org.apache.beam.runners.jstorm.translation.runtime.Executor; import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt; +import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; import org.apache.beam.runners.jstorm.translation.translator.Stream; import org.apache.beam.runners.jstorm.translation.util.CommonInstance; import org.apache.beam.runners.jstorm.uti
[50/53] [abbrv] beam git commit: jstorm-runner: Add Kryo serializer for UnmodifiableIterable
jstorm-runner: Add Kryo serializer for UnmodifiableIterable Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e808730 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e808730 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e808730 Branch: refs/heads/jstorm-runner Commit: 9e8087306b5562fdecf678979b9f2d49dfaf368f Parents: 90ed2ef Author: basti.lj <basti...@alibaba-inc.com> Authored: Wed Aug 16 19:01:48 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:03:01 2017 +0800 -- .../BeamSdkRepackUtilsSerializer.java | 34 .../serialization/GuavaUtilsSerializer.java | 34 2 files changed, 68 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/9e808730/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java index 4ae47eb..2912194 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java @@ -24,12 +24,14 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; import java.util.EnumMap; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.sdk.repackaged.com.google.common.collect.HashBasedTable; import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList; import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap; import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet; import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableTable; +import org.apache.beam.sdk.repackaged.com.google.common.collect.Iterables; import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists; import org.apache.beam.sdk.repackaged.com.google.common.collect.Maps; import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets; @@ -244,10 +246,42 @@ public class BeamSdkRepackUtilsSerializer { ImmutableSetSerializer.class); } + /** + * Specific serializer of {@link Kryo} for UnmodifiableIterable. + */ + public static class UnmodifiableIterableSerializer extends Serializer<Iterable> { + +@Override +public void write(Kryo kryo, Output output, Iterable object) { + int size = Iterables.size(object); + output.writeInt(size, true); + for (Object elm : object) { +kryo.writeClassAndObject(output, elm); + } +} + +@Override +public Iterable read(Kryo kryo, Input input, Class<Iterable> type) { + final int size = input.readInt(true); + List iterable = Lists.newArrayList(); + for (int i = 0; i < size; ++i) { +iterable.add(kryo.readClassAndObject(input)); + } + return Iterables.unmodifiableIterable(iterable); +} + } + + private static void registerUnmodifiableIterablesSerializers(Config config) { +config.registerSerialization( +Iterables.unmodifiableIterable(Lists.newArrayList()).getClass(), +UnmodifiableIterableSerializer.class); + } + public static void registerSerializers(Config config) { registerImmutableListSerializers(config); registerImmutableMapSerializers(config); registerImmutableSetSerializers(config); +registerUnmodifiableIterablesSerializers(config); } } http://git-wip-us.apache.org/repos/asf/beam/blob/9e808730/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java index e6f750c..ee83aa6 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableTable; +import com.google.common.collect.Iterables; import com.google.co
[48/53] [abbrv] beam git commit: jstorm-runner: Add maven repository for JStorm dependency
jstorm-runner: Add maven repository for JStorm dependency Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/00b9c5c8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/00b9c5c8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/00b9c5c8 Branch: refs/heads/jstorm-runner Commit: 00b9c5c867a57c291c43caa2b554dda9bb3228ba Parents: 87aaa6e Author: basti.lj <basti...@alibaba-inc.com> Authored: Thu Aug 17 19:13:50 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:03:01 2017 +0800 -- runners/jstorm/pom.xml | 31 +++ 1 file changed, 31 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/00b9c5c8/runners/jstorm/pom.xml -- diff --git a/runners/jstorm/pom.xml b/runners/jstorm/pom.xml index 75387ef..681adb5 100644 --- a/runners/jstorm/pom.xml +++ b/runners/jstorm/pom.xml @@ -34,6 +34,37 @@ 2.5.0-SNAPSHOT + + + apache.snapshots + Apache Snapshot Repository + https://repository.apache.org/snapshots + +false + + + + ossrh releases + https://oss.sonatype.org/content/repositories/releases + +true + + +false + + + + ossrh snapshots + https://oss.sonatype.org/content/repositories/snapshots + +false + + +true + + + +
[51/53] [abbrv] beam git commit: jstorm-runner: Add job file to run ValidatesRunner tests of JStorm runner
jstorm-runner: Add job file to run ValidatesRunner tests of JStorm runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cb1c3f46 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cb1c3f46 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cb1c3f46 Branch: refs/heads/jstorm-runner Commit: cb1c3f4694a777b2f5a19959561e388bda36a972 Parents: 9e80873 Author: basti.lj <basti...@alibaba-inc.com> Authored: Wed Aug 16 19:03:38 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:03:01 2017 +0800 -- ...ostCommit_Java_ValidatesRunner_JStorm.groovy | 43 1 file changed, 43 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/cb1c3f46/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_JStorm.groovy -- diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_JStorm.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_JStorm.groovy new file mode 100644 index 000..f7578fa --- /dev/null +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_JStorm.groovy @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import common_job_properties + +// This job runs the suite of ValidatesRunner tests against the JStorm runner. +mavenJob('beam_PostCommit_Java_ValidatesRunner_JStorm') { + description('Runs the ValidatesRunner suite on the JStorm runner.') + previousNames('beam_PostCommit_Java_RunnableOnService_JStorm') + + // Set common parameters. + common_job_properties.setTopLevelMainJobProperties(delegate) + + // Set maven parameters. + common_job_properties.setMavenConfig(delegate) + + // Sets that this is a PostCommit job. + common_job_properties.setPostCommit(delegate) + + // Allows triggering this build against pull requests. + common_job_properties.enablePhraseTriggeringFromPullRequest( +delegate, +'Apache JStorm Runner ValidatesRunner Tests', +'Run JStorm ValidatesRunner') + + // Maven goals for this job. + goals('-B -e clean verify -am -pl runners/jstorm -Plocal-validates-runner-tests -Pvalidates-runner-tests') +}
[04/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java new file mode 100644 index 000..1de881f --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import backtype.storm.task.TopologyContext; +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.google.auto.value.AutoValue; + +@AutoValue +public abstract class ExecutorContext { +public static ExecutorContext of(TopologyContext topologyContext, ExecutorsBolt bolt, IKvStoreManager kvStoreManager) { +return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager); +} + +public abstract TopologyContext getTopologyContext(); + +public abstract ExecutorsBolt getExecutorsBolt(); + +public abstract IKvStoreManager getKvStoreManager(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java new file mode 100644 index 000..9df1e17 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.runtime; + +import java.io.IOException; +import java.util.*; + +import avro.shaded.com.google.common.base.Joiner; +import avro.shaded.com.google.common.collect.Sets; +import backtype.storm.tuple.ITupleExt; +import org.apache.beam.runners.jstorm.translation.util.CommonInstance; +import com.alibaba.jstorm.cache.IKvStoreManager; +import com.alibaba.jstorm.cache.KvStoreManagerFactory; +import com.alibaba.jstorm.cluster.Common; +import com.alibaba.jstorm.utils.KryoSerializer; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Maps; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class ExecutorsBolt extends AdaptorBasicBolt { +private static final long serialVersionUID = -7751043327801735211L; + +private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class); + +protected ExecutorContext
[10/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java -- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java index 11c7c94..2a8160c 100644 --- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java +++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,18 +17,27 @@ */ package org.apache.beam.runners.jstorm.translation.runtime.state; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + import avro.shaded.com.google.common.collect.Maps; -import org.apache.beam.runners.jstorm.translation.runtime.TimerServiceImpl; import com.alibaba.jstorm.cache.IKvStoreManager; import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory; import com.alibaba.jstorm.utils.KryoSerializer; - +import java.util.Iterator; +import java.util.Map; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.jstorm.translation.runtime.TimerServiceImpl; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; @@ -40,180 +49,174 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.util.Iterator; -import java.util.Map; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.hasEntry; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - /** * Tests for {@link JStormStateInternals}. */ @RunWith(JUnit4.class) public class JStormStateInternalsTest { -@Rule -public final TemporaryFolder tmp = new TemporaryFolder(); - -private JStormStateInternals jstormStateInternals; - -@Before -public void setup() throws Exception { -IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager( -Maps.newHashMap(), -"test", -tmp.toString(), -new KryoSerializer(Maps.newHashMap())); -jstormStateInternals = new JStormStateInternals("key-1", kvStoreManager, new TimerServiceImpl(), 0); -} - -@Test -public void testValueState() throws Exception { -ValueState valueState = jstormStateInternals.state( -StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); -valueState.write(Integer.MIN_VALUE); -assertEquals(Integer.MIN_VALUE, valueState.read().longValue()); -valueState.write(Integer.MAX_VALUE); -assertEquals(Integer.MAX_VALUE, valueState.read().longValue()); -} - -@Test -public void testValueStateIdenticalId() throws Exception { -ValueState valueState = jstormStateInternals.state( -StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); -ValueState valueStateIdentical = jstormStateInternals.state( -StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of())); - -valueState.write(Integer.MIN_VALUE); -assertEquals(Integer.MIN_VALUE, valueState.read().longValue()); -assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue()); -valueState.write(Integer.MAX_VALUE); -assertEquals(Integer.MAX_VALUE, valueState.read().longValue()); -assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue()); + @Rule + public final TemporaryFolder tmp
[44/53] [abbrv] beam git commit: jstorm-runner: add missing "apache license" header for some files.
jstorm-runner: add missing "apache license" header for some files. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/61e9fa65 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/61e9fa65 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/61e9fa65 Branch: refs/heads/jstorm-runner Commit: 61e9fa6581377dcb50edb4e4b48bac353cb3ba0d Parents: e182cf7 Author: basti.lj <basti...@alibaba-inc.com> Authored: Mon Jul 24 13:00:37 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:03:00 2017 +0800 -- .../beam/runners/jstorm/TestJStormRunner.java | 17 + .../serialization/CollectionsSerializer.java | 17 + .../serialization/ImmutableListSerializer.java| 17 + .../serialization/ImmutableMapSerializer.java | 17 + .../serialization/ImmutableSetSerializer.java | 17 + .../serialization/KvStoreIterableSerializer.java | 17 + .../SdkRepackImmuListSerializer.java | 18 +- .../serialization/SdkRepackImmuSetSerializer.java | 17 + .../SdkRepackImmutableMapSerializer.java | 17 + .../UnmodifiableCollectionsSerializer.java| 17 + runners/jstorm/src/test/resources/logback.xml | 14 ++ 11 files changed, 184 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/61e9fa65/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index 3124da2..21a58e3 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.jstorm; import static com.google.common.base.Preconditions.checkNotNull; http://git-wip-us.apache.org/repos/asf/beam/blob/61e9fa65/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java index 0548196..1c8053e 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.jstorm.serialization; import backtype.storm.Config; http://git-wip-us.apache.org/repos/asf/beam/blob/61e9fa65/runners/jstorm/src/main/java/org/apache/
[38/53] [abbrv] beam git commit: jstorm-runner: minor update for exception handling of TestJStormRunner
jstorm-runner: minor update for exception handling of TestJStormRunner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ad046483 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ad046483 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ad046483 Branch: refs/heads/jstorm-runner Commit: ad046483e6d2341a4a2156e0db15f213c7f7feea Parents: 1819833 Author: basti.lj <basti...@alibaba-inc.com> Authored: Wed Jul 19 10:57:40 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:59 2017 +0800 -- .../main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/ad046483/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index 0088cf9..3124da2 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -67,7 +67,7 @@ public class TestJStormRunner extends PipelineRunner { throw new AssertionError("Failed assertion checks."); } else if (taskExceptionRec != null) { LOG.info("Exception was found.", taskExceptionRec); - throw new AssertionError(taskExceptionRec.getCause()); + throw new RuntimeException(taskExceptionRec.getCause()); } else { JStormUtils.sleepMs(500); waitTime += 500;
[52/53] [abbrv] beam git commit: jstorm-runner: The failure of testing "SDK Python" blocked the validation of JStorm runner, so comment out "SDK Python" module temporarily. After the validation of JSt
jstorm-runner: The failure of testing "SDK Python" blocked the validation of JStorm runner, so comment out "SDK Python" module temporarily. After the validation of JStorm runner. this commit shall be reverted. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87aaa6e2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87aaa6e2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87aaa6e2 Branch: refs/heads/jstorm-runner Commit: 87aaa6e259f75d890a08da75ad9a175402e06660 Parents: cb1c3f4 Author: basti.lj <basti...@alibaba-inc.com> Authored: Thu Aug 17 17:12:28 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:03:01 2017 +0800 -- sdks/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/87aaa6e2/sdks/pom.xml -- diff --git a/sdks/pom.xml b/sdks/pom.xml index 27b9610..32c329d 100644 --- a/sdks/pom.xml +++ b/sdks/pom.xml @@ -35,7 +35,7 @@ common java -python +
[47/53] [abbrv] beam git commit: jstorm-runner: 1. Generate execution DAG for runtime 2. Restructure Kryo serializers
jstorm-runner: 1. Generate execution DAG for runtime 2. Restructure Kryo serializers Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6078cbc6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6078cbc6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6078cbc6 Branch: refs/heads/jstorm-runner Commit: 6078cbc6bd5ca6e48e237c652c532b189acef2b7 Parents: 240f61b Author: basti.lj <basti...@alibaba-inc.com> Authored: Wed Aug 9 16:48:42 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:03:00 2017 +0800 -- runners/jstorm/pom.xml | 8 +- .../beam/runners/jstorm/JStormRunner.java | 169 +++-- .../BeamSdkRepackUtilsSerializer.java | 253 +++ .../serialization/BeamUtilsSerializer.java | 114 + .../serialization/CollectionsSerializer.java| 60 - .../serialization/GuavaUtilsSerializer.java | 252 ++ .../serialization/ImmutableListSerializer.java | 106 .../serialization/ImmutableMapSerializer.java | 87 --- .../serialization/ImmutableSetSerializer.java | 92 --- .../serialization/JStormUtilsSerializer.java| 126 + .../serialization/JavaUtilsSerializer.java | 236 + .../KvStoreIterableSerializer.java | 74 -- .../SdkRepackImmuListSerializer.java| 107 .../SdkRepackImmuSetSerializer.java | 95 --- .../SdkRepackImmutableMapSerializer.java| 90 --- .../UnmodifiableCollectionsSerializer.java | 201 --- .../translation/BoundedSourceTranslator.java| 1 + .../jstorm/translation/DoFnExecutor.java| 2 +- .../runners/jstorm/translation/Executor.java| 2 +- .../jstorm/translation/ExecutorsBolt.java | 35 ++- .../jstorm/translation/FlattenTranslator.java | 1 + .../translation/JStormStateInternals.java | 24 +- .../jstorm/translation/TranslationContext.java | 19 +- .../translation/UnboundedSourceSpout.java | 12 + .../translation/UnboundedSourceTranslator.java | 1 + 25 files changed, 1203 insertions(+), 964 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/pom.xml -- diff --git a/runners/jstorm/pom.xml b/runners/jstorm/pom.xml index 79634e9..75387ef 100644 --- a/runners/jstorm/pom.xml +++ b/runners/jstorm/pom.xml @@ -53,9 +53,6 @@ test - org.apache.beam.sdk.testing.ValidatesRunner @@ -144,6 +141,11 @@ com.google.auto.value auto-value + +com.googlecode.json-simple +json-simple +1.1 + http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 56db1c6..47de42c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -26,22 +26,25 @@ import backtype.storm.topology.IRichBolt; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; -import com.alibaba.jstorm.cache.KvStoreIterable; +import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.cluster.StormConfig; import com.alibaba.jstorm.transactional.TransactionTopologyBuilder; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import java.util.Collection; import java.util.HashMap; import java.util.Map; -import org.apache.beam.runners.jstorm.serialization.CollectionsSerializer; -import org.apache.beam.runners.jstorm.serialization.ImmutableListSerializer; -import org.apache.beam.runners.jstorm.serialization.ImmutableMapSerializer; -import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer; -import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer; -import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer; -import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer; -import org.apache.beam.runners.jstorm.serialization.SdkRepackImmutableMapSerializer; -import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerialize
[39/53] [abbrv] beam git commit: jstorm-runner: add SdkRepackImmutableMapSerializer.
jstorm-runner: add SdkRepackImmutableMapSerializer. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/588a6981 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/588a6981 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/588a6981 Branch: refs/heads/jstorm-runner Commit: 588a6981855b68b9733a1b0f368dce0ad5cfe837 Parents: ad04648 Author: Pei He <p...@apache.org> Authored: Wed Jul 19 11:13:04 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:59 2017 +0800 -- .../beam/runners/jstorm/JStormRunner.java | 2 + .../SdkRepackImmutableMapSerializer.java| 73 2 files changed, 75 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/588a6981/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index baf4e5a..286a975 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer; import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer; import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer; import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer; +import org.apache.beam.runners.jstorm.serialization.SdkRepackImmutableMapSerializer; import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerializer; import org.apache.beam.runners.jstorm.translation.AbstractComponent; import org.apache.beam.runners.jstorm.translation.CommonInstance; @@ -103,6 +104,7 @@ public class JStormRunner extends PipelineRunner { ImmutableSetSerializer.registerSerializers(config); SdkRepackImmuSetSerializer.registerSerializers(config); ImmutableMapSerializer.registerSerializers(config); +SdkRepackImmutableMapSerializer.registerSerializers(config); config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class); return config; http://git-wip-us.apache.org/repos/asf/beam/blob/588a6981/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java new file mode 100644 index 000..546538a --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java @@ -0,0 +1,73 @@ +package org.apache.beam.runners.jstorm.serialization; + +import backtype.storm.Config; +import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; +import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap; +import org.apache.beam.sdk.repackaged.com.google.common.collect.Maps; + +/** + * Specific serializer of {@link Kryo} for ImmutableMap. + */ +public class SdkRepackImmutableMapSerializer +extends Serializer<ImmutableMap<Object, ? extends Object>> { + + private static final boolean DOES_NOT_ACCEPT_NULL = true; + private static final boolean IMMUTABLE = true; + + public SdkRepackImmutableMapSerializer() { +super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } + + @Override + public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) { +kryo.writeObject(output, Maps.newHashMap(immutableMap)); + } + + @Override + public ImmutableMap<Object, Object> read( + Kryo kryo, + Input input, + Class<ImmutableMap<Object, ? extends Object>> type) { +Map map = kryo.readObject(input, HashMap.class); +return ImmutableMap.copyOf(map); + } + + /** + * Creates a new {@link SdkRepackImmutableMapSerializer} and registers its serializer + * for the several ImmutableMap related classes. + */ + public static void registerSerializers(Config config) { + +config.registerSerialization(ImmutableMap.class, SdkRepackImmutableMapSerializer.class); +config.registerSe
[02/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java new file mode 100644 index 000..481b7fb --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.util; + +import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Default StepContext for running DoFn This does not allow accessing state or timer internals. + */ +public class DefaultStepContext implements ExecutionContext.StepContext { + +private TimerInternals timerInternals; + +private StateInternals stateInternals; + +public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) { +this.timerInternals = checkNotNull(timerInternals, "timerInternals"); +this.stateInternals = checkNotNull(stateInternals, "stateInternals"); +} + +@Override +public String getStepName() { +return null; +} + +@Override +public String getTransformName() { +return null; +} + +@Override +public void noteOutput(WindowedValue windowedValue) { + +} + +@Override +public void noteOutput(TupleTag tupleTag, WindowedValue windowedValue) { + +} + +@Override +publicvoid writePCollectionViewData(TupleTag tag, Iterable data, +Coder > dataCoder, W window, Coder windowCoder) throws IOException { +throw new UnsupportedOperationException("Writing side-input data is not supported."); +} + +@Override +public StateInternals stateInternals() { +return stateInternals; +} + +@Override +public TimerInternals timerInternals() { +return timerInternals; +} + +public void setStateInternals(StateInternals stateInternals) { +this.stateInternals = stateInternals; +} + +public void setTimerInternals(TimerInternals timerInternals) { +this.timerInternals = timerInternals; +} +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java new file mode 100644 index 000..cbf815a --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing
[20/53] [abbrv] beam git commit: jstorm-runner: Fix checkstyle error
jstorm-runner: Fix checkstyle error Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5a15d548 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5a15d548 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5a15d548 Branch: refs/heads/jstorm-runner Commit: 5a15d5488f9438695948e72af08ada4c263471d7 Parents: 78a5076 Author: basti.lj <basti...@alibaba-inc.com> Authored: Fri Jul 14 14:14:49 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:57 2017 +0800 -- .../runners/jstorm/JStormPipelineOptions.java | 3 + .../beam/runners/jstorm/JStormRunner.java | 16 +- .../beam/runners/jstorm/TestJStormRunner.java | 2 +- .../serialization/ImmutableListSerializer.java | 4 +- .../serialization/ImmutableMapSerializer.java | 3 + .../serialization/ImmutableSetSerializer.java | 3 + .../KvStoreIterableSerializer.java | 3 + .../SdkRepackImmuListSerializer.java| 3 + .../SdkRepackImmuSetSerializer.java | 3 + .../UnmodifiableCollectionsSerializer.java | 5 +- .../translation/JStormPipelineTranslator.java | 186 +++ .../translation/StormPipelineTranslator.java| 186 --- .../jstorm/translation/TranslationContext.java | 9 +- .../translation/runtime/AbstractComponent.java | 4 +- .../translation/runtime/AdaptorBasicBolt.java | 5 +- .../translation/runtime/AdaptorBasicSpout.java | 5 +- .../translation/runtime/DoFnExecutor.java | 16 +- .../jstorm/translation/runtime/Executor.java| 7 +- .../translation/runtime/ExecutorContext.java| 3 + .../translation/runtime/ExecutorsBolt.java | 15 +- .../translation/runtime/FlattenExecutor.java| 6 +- .../runtime/GroupByWindowExecutor.java | 5 + .../runtime/MultiOutputDoFnExecutor.java| 7 +- .../runtime/MultiStatefulDoFnExecutor.java | 4 + .../runtime/StatefulDoFnExecutor.java | 4 + .../translation/runtime/TimerServiceImpl.java | 8 +- .../translation/runtime/TxExecutorsBolt.java| 5 +- .../runtime/TxUnboundedSourceSpout.java | 5 +- .../runtime/UnboundedSourceSpout.java | 5 +- .../runtime/WindowAssignExecutor.java | 7 +- .../runtime/state/JStormBagState.java | 5 +- .../runtime/state/JStormMapState.java | 7 +- .../translator/FlattenTranslator.java | 6 +- .../translator/GroupByKeyTranslator.java| 5 + .../translator/ParDoBoundMultiTranslator.java | 2 +- .../translator/ParDoBoundTranslator.java| 4 +- .../jstorm/translation/translator/Stream.java | 11 +- .../translator/TransformTranslator.java | 4 + .../translation/translator/ViewTranslator.java | 18 +- .../translator/WindowAssignTranslator.java | 7 +- .../jstorm/translation/util/CommonInstance.java | 5 +- .../beam/runners/jstorm/util/RunnerUtils.java | 12 +- .../jstorm/util/SerializedPipelineOptions.java | 2 +- .../jstorm/util/SingletonKeyedWorkItem.java | 3 +- .../runtime/state/JStormStateInternalsTest.java | 14 +- 45 files changed, 384 insertions(+), 258 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java index 2a87756..114877a 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java @@ -64,6 +64,9 @@ public interface JStormPipelineOptions extends PipelineOptions { Map getParallelismNumMap(); void setParallelismNumMap(Map parallelismNumMap); + /** + * Default value factory for topology configuration of JStorm. + */ class DefaultMapValueFactory implements DefaultValueFactory { @Override public Map create(PipelineOptions pipelineOptions) { http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 5375d6e..00ec7f6 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -
[05/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java new file mode 100644 index 000..aa7d325 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java @@ -0,0 +1,92 @@ +package org.apache.beam.runners.jstorm.serialization; + +import backtype.storm.Config; +import org.apache.beam.runners.jstorm.util.RunnerUtils; +import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; +import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; +import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; +import com.google.common.collect.*; + +public class ImmutableListSerializer extends Serializer{ + +private static final boolean DOES_NOT_ACCEPT_NULL = false; +private static final boolean IMMUTABLE = true; + +public ImmutableListSerializer() { +super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); +} + +@Override +public void write(Kryo kryo, Output output, ImmutableList object) { +output.writeInt(object.size(), true); +for (Object elm : object) { +kryo.writeClassAndObject(output, elm); +} +} + +@Override +public ImmutableList read(Kryo kryo, Input input, Class type) { +final int size = input.readInt(true); +final Object[] list = new Object[size]; +for (int i = 0; i < size; ++i) { +list[i] = kryo.readClassAndObject(input); +} +return ImmutableList.copyOf(list); +} + +/** + * Creates a new {@link ImmutableListSerializer} and registers its serializer + * for the several ImmutableList related classes. + */ +public static void registerSerializers(Config config) { + +// ImmutableList (abstract class) +// +- RegularImmutableList +// | RegularImmutableList +// +- SingletonImmutableList +// | Optimized for List with only 1 element. +// +- SubList +// | Representation for part of ImmutableList +// +- ReverseImmutableList +// | For iterating in reverse order +// +- StringAsImmutableList +// | Used by Lists#charactersOf +// +- Values (ImmutableTable values) +// Used by return value of #values() when there are multiple cells + +config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class); +config.registerSerialization( +RunnerUtils.getBeamSdkRepackClass(ImmutableList.class), ImmutableListSerializer.class); + +// Note: +// Only registering above is good enough for serializing/deserializing. +// but if using Kryo#copy, following is required. + +config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class); +config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().getClass()), ImmutableListSerializer.class); +config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class); +config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1).getClass()), ImmutableListSerializer.class); +config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), ImmutableListSerializer.class); +config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1,2,3).subList(1, 2).getClass()), ImmutableListSerializer.class); +config.registerSerialization(ImmutableList.of().reverse().getClass(), ImmutableListSerializer.class); +config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().reverse().getClass()), ImmutableListSerializer.class); + + config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), ImmutableListSerializer.class); +config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(Lists.charactersOf("KryoRocks").getClass()), ImmutableListSerializer.class); + +Table baseTable = HashBasedTable.create(); +baseTable.put(1, 2, 3); +baseTable.put(4, 5, 6); +Table table = ImmutableTable.copyOf(baseTable); +config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class); +config.registerSerialization( +RunnerUtils.getBeamSdkRepackClass(table.values().getClass()),
[15/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.
jstorm-runner: fix checkstyles. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aa251a4a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aa251a4a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aa251a4a Branch: refs/heads/jstorm-runner Commit: aa251a4a4d2850310f5dfd9db4d605cce41bba13 Parents: f3df3a2 Author: Pei He <p...@apache.org> Authored: Thu Jul 13 17:37:51 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:47 2017 +0800 -- .../beam/runners/jstorm/JStormRunner.java | 395 +-- .../runners/jstorm/JStormRunnerRegistrar.java | 39 +- .../beam/runners/jstorm/JStormRunnerResult.java | 118 ++-- .../beam/runners/jstorm/TestJStormRunner.java | 188 +++--- .../serialization/ImmutableListSerializer.java | 152 +++-- .../serialization/ImmutableMapSerializer.java | 78 ++- .../serialization/ImmutableSetSerializer.java | 93 +-- .../KvStoreIterableSerializer.java | 73 +- .../SdkRepackImmuListSerializer.java| 116 ++-- .../SdkRepackImmuSetSerializer.java | 98 +-- .../UnmodifiableCollectionsSerializer.java | 290 .../translation/StormPipelineTranslator.java| 273 .../jstorm/translation/TranslationContext.java | 667 ++- .../jstorm/translation/TranslatorRegistry.java | 65 +- .../translation/runtime/AbstractComponent.java | 66 +- .../translation/runtime/AdaptorBasicBolt.java | 2 +- .../translation/runtime/AdaptorBasicSpout.java | 2 +- .../translation/runtime/DoFnExecutor.java | 511 +++--- .../runtime/DoFnRunnerWithMetrics.java | 3 +- .../jstorm/translation/runtime/Executor.java| 13 +- .../translation/runtime/ExecutorContext.java| 15 +- .../translation/runtime/ExecutorsBolt.java | 502 +++--- .../translation/runtime/FlattenExecutor.java| 61 +- .../runtime/GroupByWindowExecutor.java | 231 --- .../runtime/MultiOutputDoFnExecutor.java| 79 ++- .../runtime/MultiStatefulDoFnExecutor.java | 64 +- .../runtime/StatefulDoFnExecutor.java | 63 +- .../translation/runtime/TimerService.java | 37 +- .../translation/runtime/TimerServiceImpl.java | 233 +++ .../translation/runtime/TxExecutorsBolt.java| 193 +++--- .../runtime/TxUnboundedSourceSpout.java | 244 +++ .../runtime/UnboundedSourceSpout.java | 288 .../translation/runtime/ViewExecutor.java | 53 +- .../runtime/WindowAssignExecutor.java | 130 ++-- .../runtime/state/JStormBagState.java | 261 .../runtime/state/JStormCombiningState.java | 98 +-- .../runtime/state/JStormMapState.java | 227 --- .../runtime/state/JStormStateInternals.java | 290 .../runtime/state/JStormValueState.java | 92 ++- .../runtime/state/JStormWatermarkHoldState.java | 88 +-- .../runtime/timer/JStormTimerInternals.java | 143 ++-- .../translator/BoundedSourceTranslator.java | 29 +- .../translator/CombineGloballyTranslator.java | 5 +- .../translator/CombinePerKeyTranslator.java | 5 +- .../translator/FlattenTranslator.java | 34 +- .../translator/GroupByKeyTranslator.java| 71 +- .../translator/ParDoBoundMultiTranslator.java | 143 ++-- .../translator/ParDoBoundTranslator.java| 128 ++-- .../translator/ReshuffleTranslator.java | 4 +- .../jstorm/translation/translator/Stream.java | 109 +-- .../translator/TransformTranslator.java | 74 +- .../translator/UnboundedSourceTranslator.java | 28 +- .../translation/translator/ViewTranslator.java | 586 .../translator/WindowAssignTranslator.java | 26 +- .../translator/WindowBoundTranslator.java | 26 +- .../jstorm/translation/util/CommonInstance.java | 6 +- .../util/DefaultSideInputReader.java| 33 +- .../translation/util/DefaultStepContext.java| 89 +-- .../beam/runners/jstorm/util/RunnerUtils.java | 46 +- .../jstorm/util/SerializedPipelineOptions.java | 51 +- .../jstorm/util/SingletonKeyedWorkItem.java | 3 +- .../jstorm/JStormRunnerRegistrarTest.java | 4 +- .../runtime/state/JStormStateInternalsTest.java | 345 +- 63 files changed, 4314 insertions(+), 4165 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 39c723b..5fdbe4d 100644 --- a/runners/jstorm/s
[16/53] [abbrv] beam git commit: jstorm-runner: fix compilation error and remove obsolete method.
jstorm-runner: fix compilation error and remove obsolete method. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/78a5076a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/78a5076a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/78a5076a Branch: refs/heads/jstorm-runner Commit: 78a5076a6951a697922aceaabc1e32dd20c8de36 Parents: 4ff42cb Author: basti.lj <basti...@alibaba-inc.com> Authored: Fri Jul 14 10:29:00 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:56 2017 +0800 -- .../beam/runners/jstorm/JStormRunner.java | 4 ++-- .../serialization/ImmutableListSerializer.java | 21 2 files changed, 2 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/78a5076a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 5fdbe4d..5375d6e 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -38,7 +38,7 @@ import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer; import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer; import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer; import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerializer; -import org.apache.beam.runners.jstorm.translation.StormPipelineTranslator; +import org.apache.beam.runners.jstorm.translation.JStormPipelineTranslator; import org.apache.beam.runners.jstorm.translation.TranslationContext; import org.apache.beam.runners.jstorm.translation.runtime.AbstractComponent; import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicBolt; @@ -114,7 +114,7 @@ public class JStormRunner extends PipelineRunner { public JStormRunnerResult run(Pipeline pipeline) { LOG.info("Running pipeline..."); TranslationContext context = new TranslationContext(this.options); -StormPipelineTranslator transformer = new StormPipelineTranslator(context); +JStormPipelineTranslator transformer = new JStormPipelineTranslator(context); transformer.translate(pipeline); LOG.info("UserGraphContext=\n{}", context.getUserGraphContext()); LOG.info("ExecutionGraphContext=\n{}", context.getExecutionGraphContext()); http://git-wip-us.apache.org/repos/asf/beam/blob/78a5076a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java index fa4eeb6..c479f26 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java @@ -60,49 +60,28 @@ public class ImmutableListSerializer extends Serializer<ImmutableList> { // Used by return value of #values() when there are multiple cells config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class); -config.registerSerialization( -RunnerUtils.getBeamSdkRepackClass(ImmutableList.class), ImmutableListSerializer.class); // Note: // Only registering above is good enough for serializing/deserializing. // but if using Kryo#copy, following is required. config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class); -config.registerSerialization( -RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().getClass()), -ImmutableListSerializer.class); config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class); config.registerSerialization( -RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1).getClass()), -ImmutableListSerializer.class); -config.registerSerialization( ImmutableList.of(1, 2, 3).subList(1, 2).getClass(), ImmutableListSerializer.class); config.registerSerialization( -RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1, 2, 3).subList(1, 2).getClass()), -ImmutableListSerializer.class); -config.registerSerialization
[26/53] [abbrv] beam git commit: jstorm-runner: remove code that was commented out.
jstorm-runner: remove code that was commented out. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8cdd41b1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8cdd41b1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8cdd41b1 Branch: refs/heads/jstorm-runner Commit: 8cdd41b1d4c7cd5aaf96f3f9c6c2fd203c047e02 Parents: 74ceac6 Author: Pei He <p...@apache.org> Authored: Fri Jul 14 15:51:22 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Sat Aug 19 12:02:57 2017 +0800 -- .../beam/runners/jstorm/translation/ExecutorsBolt.java | 9 - .../runners/jstorm/translation/JStormStateInternals.java| 1 - .../runners/jstorm/translation/TransformTranslator.java | 2 -- 3 files changed, 4 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/8cdd41b1/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java index ef12db8..ce6ea2c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java @@ -319,12 +319,11 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { @Override public String toString() { -// LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString()); List ret = new ArrayList<>(); -/*ret.add("inputTags"); -for (TupleTag inputTag : inputTagToExecutor.keySet()) { -ret.add(inputTag.getId()); -}*/ +ret.add("inputTags"); +for (TupleTag inputTag : inputTagToExecutor.keySet()) { + ret.add(inputTag.getId()); +} ret.add("internalExecutors"); for (Executor executor : inputTagToExecutor.values()) { ret.add(executor.toString()); http://git-wip-us.apache.org/repos/asf/beam/blob/8cdd41b1/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java index fce870f..78882f2 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java @@ -73,7 +73,6 @@ class JStormStateInternals implements StateInternals { @Override public T state( StateNamespace namespace, StateTag address, StateContext c) { -// throw new UnsupportedOperationException("StateContext is not supported."); /** * TODOï¼ * Same implementation as state() which is without StateContext. This might be updated after http://git-wip-us.apache.org/repos/asf/beam/blob/8cdd41b1/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java index edd3d8a..4d431d3 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java @@ -62,7 +62,6 @@ interface TransformTranslator> { @Override public String apply(Map.Entry<TupleTag, PValue> taggedPValue) { return taggedPValue.getKey().getId(); - // return taggedPValue.getValue().getName(); } })), transform.getName(), @@ -71,7 +70,6 @@ interface TransformTranslator> { @Override public String apply(Map.Entry<TupleTag, PValue> taggedPvalue) { return taggedPvalue.getKey().getId(); - //return taggedPValue.getValue().getName(); } }))); }
[24/53] [abbrv] beam git commit: jstorm-runner: move most classes to translation package and reduece their visibility to package private.
http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java new file mode 100644 index 000..fce870f --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.alibaba.jstorm.cache.ComposedKey; +import com.alibaba.jstorm.cache.IKvStoreManager; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.SetState; +import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.StateBinder; +import org.apache.beam.sdk.state.StateContext; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.joda.time.Instant; + +/** + * JStorm implementation of {@link StateInternals}. + */ +class JStormStateInternals implements StateInternals { + + private static final String STATE_INFO = "state-info:"; + + @Nullable + private final K key; + private final IKvStoreManager kvStoreManager; + private final TimerService timerService; + private final int executorId; + + public JStormStateInternals(K key, IKvStoreManager kvStoreManager, + TimerService timerService, int executorId) { +this.key = key; +this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager"); +this.timerService = checkNotNull(timerService, "timerService"); +this.executorId = executorId; + } + + @Nullable + @Override + public K getKey() { +return key; + } + + @Override + public T state( + StateNamespace namespace, StateTag address, StateContext c) { +// throw new UnsupportedOperationException("StateContext is not supported."); +/** + * TODOï¼ + * Same implementation as state() which is without StateContext. This might be updated after + * we figure out if we really need StateContext for JStorm state internals. + */ +return state(namespace, address); + } + + @Override + public T state(final StateNamespace namespace, StateTag address) { +return address.getSpec().bind(address.getId(), new StateBinder() { + @Override + public ValueState bindValue(String id, StateSpecspec, Coder coder) { +try { + return new JStormValueState<>( + getKey(), namespace, kvStoreManager. getOrCreate(getStoreId(id))); +} catch (IOException e) { + throw new RuntimeException(); +} + } + + @Override + public BagState bindBag(String id, StateSpec spec, Coder elemCoder) { +try { + return new JStormBagState( + getKey(), namespace, kvStoreManager. getOrCreate(getStoreId(id)), + kvStoreManager. getOrCreate(STATE_INFO + getStoreId(id))); +} catch (IOException e) { + throw new RuntimeException(); +} + } + + @Override + public SetState bindSet(String id, StateSpec spec, Coder elemCoder) { +throw new
[jira] [Closed] (BEAM-1612) Support real Bundle in Flink runner
[ https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pei He closed BEAM-1612. Resolution: Fixed > Support real Bundle in Flink runner > --- > > Key: BEAM-1612 > URL: https://issues.apache.org/jira/browse/BEAM-1612 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Jingsong Lee >Assignee: Jingsong Lee > Fix For: 2.2.0 > > > The Bundle is very important in the beam model. Users can use the bundle to > flush buffer, can reuse many heavyweight resources in a bundle. Most IO > plugins use the bundle to flush. > Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, > such as first placed in JavaHeap, flush into RocksDbState when invoke > finishBundle , this can reduce the number of serialization. > But now FlinkRunner calls the finishBundle every processElement. We need > support real Bundle. > I think we can have the following implementations: > 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But > sometimes this "Bundle" maybe too big. This depends on the user's checkpoint > configuration. > 2.Manually control the size of the bundle. The half-bundle will be flushed to > a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not > need to wait, just call the startBundle and finishBundle at the right time. > [Proposal > document|https://docs.google.com/document/d/1UzELM4nFu8SIeu-QJkbs0sv7Uzd1Ux4aXXM3cw4s7po/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[2/2] beam git commit: This closes #3368
This closes #3368 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/724eda37 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/724eda37 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/724eda37 Branch: refs/heads/master Commit: 724eda37ea1e54aac089d89c711ca3cee14a4603 Parents: 3a8b0b6 ceec7ce Author: Pei He <p...@apache.org> Authored: Wed Aug 16 11:46:49 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Wed Aug 16 11:46:49 2017 +0800 -- .../runners/flink/FlinkPipelineOptions.java | 11 + .../FlinkStreamingTransformTranslators.java | 77 ++-- .../wrappers/streaming/DoFnOperator.java| 412 ++- .../streaming/SplittableDoFnOperator.java | 4 +- .../wrappers/streaming/WindowDoFnOperator.java | 4 +- .../state/FlinkSplitStateInternals.java | 8 +- .../beam/runners/flink/PipelineOptionsTest.java | 21 +- .../flink/streaming/DoFnOperatorTest.java | 161 ++-- 8 files changed, 535 insertions(+), 163 deletions(-) --
[1/2] beam git commit: [BEAM-1612] Support real Bundle in Flink runner
Repository: beam Updated Branches: refs/heads/master 3a8b0b68c -> 724eda37e [BEAM-1612] Support real Bundle in Flink runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ceec7ce5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ceec7ce5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ceec7ce5 Branch: refs/heads/master Commit: ceec7ce5ba287ab40ee1f7c87129b72d4db1c1c7 Parents: 3a8b0b6 Author: JingsongLi <lzljs3620...@aliyun.com> Authored: Thu Jun 15 17:48:59 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Wed Aug 16 11:44:49 2017 +0800 -- .../runners/flink/FlinkPipelineOptions.java | 11 + .../FlinkStreamingTransformTranslators.java | 77 ++-- .../wrappers/streaming/DoFnOperator.java| 412 ++- .../streaming/SplittableDoFnOperator.java | 4 +- .../wrappers/streaming/WindowDoFnOperator.java | 4 +- .../state/FlinkSplitStateInternals.java | 8 +- .../beam/runners/flink/PipelineOptionsTest.java | 21 +- .../flink/streaming/DoFnOperatorTest.java | 161 ++-- 8 files changed, 535 insertions(+), 163 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index c255672..2432394 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -127,4 +127,15 @@ public interface FlinkPipelineOptions @Default.Boolean(false) Boolean getRetainExternalizedCheckpointsOnCancellation(); void setRetainExternalizedCheckpointsOnCancellation(Boolean retainOnCancellation); + + @Description("The maximum number of elements in a bundle.") + @Default.Long(1000) + Long getMaxBundleSize(); + void setMaxBundleSize(Long size); + + @Description("The maximum time to wait before finalising a bundle (in milliseconds).") + @Default.Long(1000) + Long getMaxBundleTimeMills(); + void setMaxBundleTimeMills(Long time); + } http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 3d7e81f..058e195 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -339,7 +339,9 @@ class FlinkStreamingTransformTranslators { List<TupleTag> additionalOutputTags, FlinkStreamingTranslationContext context, WindowingStrategy windowingStrategy, - Map<TupleTag, OutputTag<WindowedValue>> tagsToLabels, + Map<TupleTag, OutputTag<WindowedValue>> tagsToOutputTags, + Map<TupleTag, Coder<WindowedValue>> tagsToCoders, + Map<TupleTag, Integer> tagsToIds, Coder<WindowedValue> inputCoder, Coder keyCoder, Map<Integer, PCollectionView> transformedSideInputs); @@ -360,15 +362,27 @@ class FlinkStreamingTransformTranslators { WindowingStrategy windowingStrategy = input.getWindowingStrategy(); Map<TupleTag, OutputTag<WindowedValue>> tagsToOutputTags = Maps.newHashMap(); + Map<TupleTag, Coder<WindowedValue>> tagsToCoders = Maps.newHashMap(); + + // We associate output tags with ids, the Integer is easier to serialize than TupleTag. + // The return map of AppliedPTransform.getOutputs() is an ImmutableMap, its implementation is + // RegularImmutableMap, its entrySet order is the same with the order of insertion. + // So we can use the original AppliedPTransform.getOutputs() to produce deterministic ids. + Map<TupleTag, Integer> tagsToIds = Maps.newHashMap(); + int idCount = 0; + tagsToIds.put(mainOutputTag, idCount++); for (Map.Entry<TupleTag, PValue> entry : outputs.entrySet()) { if (!tagsToOutputTags.containsKey(entry.getKey())) { tagsToOutputTags.put( entry.getKey(), - new OutputTag<WindowedValue
[jira] [Updated] (BEAM-2709) Add TezRunner
[ https://issues.apache.org/jira/browse/BEAM-2709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pei He updated BEAM-2709: - Component/s: runner-tez > Add TezRunner > - > > Key: BEAM-2709 > URL: https://issues.apache.org/jira/browse/BEAM-2709 > Project: Beam > Issue Type: New Feature > Components: runner-ideas, runner-tez >Reporter: Brandon Scheller >Assignee: Brandon Scheller > > Add a TezRunner to Beam -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (BEAM-165) Add Hadoop MapReduce runner
[ https://issues.apache.org/jira/browse/BEAM-165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pei He reassigned BEAM-165: --- Assignee: Pei He (was: Jean-Baptiste Onofré) Component/s: runner-mapreduce Issue Type: New Feature (was: Wish) > Add Hadoop MapReduce runner > --- > > Key: BEAM-165 > URL: https://issues.apache.org/jira/browse/BEAM-165 > Project: Beam > Issue Type: New Feature > Components: runner-ideas, runner-mapreduce >Reporter: Jean-Baptiste Onofré >Assignee: Pei He > > I think a MapReduce runner could be a good addition to Beam. It would allow > users to smoothly "migrate" from MapReduce to Spark or Flink. > Of course, the MapReduce runner will run in batch mode (not stream). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2714) ParDo.getAdditionalInputs() return the input of View.CreatePCollectionView instead of output.
[ https://issues.apache.org/jira/browse/BEAM-2714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112072#comment-16112072 ] Pei He commented on BEAM-2714: -- Just to clarify: with runner API, there will be no view transform (neither primitive or composite), and runners need to look at ViewFn in ParDo payload to determine whether the input PCollections need to be materialized. Is that right? > ParDo.getAdditionalInputs() return the input of View.CreatePCollectionView > instead of output. > - > > Key: BEAM-2714 > URL: https://issues.apache.org/jira/browse/BEAM-2714 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Pei He >Assignee: Thomas Groh >Priority: Critical > Fix For: Not applicable > > Attachments: view-tag.png > > > For example, input-pc -> View.AsIterable -> view.out -> ParDo > getInputs() in TransformHierarchy.Node for ParDo will return input-pc instead > of view.out. > I think the code here should make sure the output of view is returned: > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L147 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (BEAM-2714) ParDo.getAdditionalInputs() return the input of View.CreatePCollectionView instead of output.
[ https://issues.apache.org/jira/browse/BEAM-2714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111016#comment-16111016 ] Pei He edited comment on BEAM-2714 at 8/2/17 2:38 PM: -- I use TransformHierarchy.Node getInputs() and getOutputs() to setup this diagram: see the attached image: view-tag.png The marked edge is wrong. was (Author: pei...@gmail.com): I use TransformHierarchy.Node getInputs() and getOutputs() to setup this diagram: !view-tag.png|thumbnail! The marked edge is wrong. > ParDo.getAdditionalInputs() return the input of View.CreatePCollectionView > instead of output. > - > > Key: BEAM-2714 > URL: https://issues.apache.org/jira/browse/BEAM-2714 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Pei He >Assignee: Thomas Groh >Priority: Critical > Attachments: view-tag.png > > > For example, input-pc -> View.AsIterable -> view.out -> ParDo > getInputs() in TransformHierarchy.Node for ParDo will return input-pc instead > of view.out. > I think the code here should make sure the output of view is returned: > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L147 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (BEAM-2714) ParDo.getAdditionalInputs() return the input of View.CreatePCollectionView instead of output.
[ https://issues.apache.org/jira/browse/BEAM-2714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111016#comment-16111016 ] Pei He edited comment on BEAM-2714 at 8/2/17 2:37 PM: -- I use TransformHierarchy.Node getInputs() and getOutputs() to setup this diagram: !view-tag.png|thumbnail! The marked edge is wrong. was (Author: pei...@gmail.com): I use TransformHierarchy.Node getInputs() and getOutputs() to setup this diagram. The marked edge is wrong. > ParDo.getAdditionalInputs() return the input of View.CreatePCollectionView > instead of output. > - > > Key: BEAM-2714 > URL: https://issues.apache.org/jira/browse/BEAM-2714 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Pei He >Assignee: Thomas Groh >Priority: Critical > Attachments: view-tag.png > > > For example, input-pc -> View.AsIterable -> view.out -> ParDo > getInputs() in TransformHierarchy.Node for ParDo will return input-pc instead > of view.out. > I think the code here should make sure the output of view is returned: > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L147 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2714) ParDo.getAdditionalInputs() return the input of View.CreatePCollectionView instead of output.
[ https://issues.apache.org/jira/browse/BEAM-2714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pei He updated BEAM-2714: - Attachment: view-tag.png I use TransformHierarchy.Node getInputs() and getOutputs() to setup this diagram. The marked edge is wrong. > ParDo.getAdditionalInputs() return the input of View.CreatePCollectionView > instead of output. > - > > Key: BEAM-2714 > URL: https://issues.apache.org/jira/browse/BEAM-2714 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Pei He >Assignee: Thomas Groh >Priority: Critical > Attachments: view-tag.png > > > For example, input-pc -> View.AsIterable -> view.out -> ParDo > getInputs() in TransformHierarchy.Node for ParDo will return input-pc instead > of view.out. > I think the code here should make sure the output of view is returned: > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L147 -- This message was sent by Atlassian JIRA (v6.4.14#64029)