[GitHub] incubator-beam pull request #1283: [BEAM-896] adjust ReadSourceITCase to exc...
Github user mxm closed the pull request at: https://github.com/apache/incubator-beam/pull/1283 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1283: [BEAM-896] adjust ReadSourceITCase to exc...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/1283 [BEAM-896] adjust ReadSourceITCase to exclude Beam temporary files This should fix the test failures in `ReadSourceITCase` caused by #1050. @dhalperi Wouldn't it be nice to have the temporary folder prefix somewhere accessible as a static variable? You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-896 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1283.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1283 commit eb1fcb35b556ceb099324eb956716f31b5badc61 Author: Maximilian Michels <m...@apache.org> Date: 2016-11-04T09:50:18Z [BEAM-896] adjust ReadSourceITCase to exclude Beam temporary files --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #1093
This closes #1093 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b5ff4c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b5ff4c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b5ff4c4 Branch: refs/heads/master Commit: 6b5ff4c4aad5e4d1419b1a147153b0f8d72324ae Parents: a2c342c 76434df Author: Maximilian MichelsAuthored: Tue Oct 18 16:59:58 2016 +0200 Committer: Maximilian Michels Committed: Tue Oct 18 16:59:58 2016 +0200 -- .../flink/FlinkDetachedRunnerResult.java| 76 .../apache/beam/runners/flink/FlinkRunner.java | 9 ++- .../beam/runners/flink/FlinkRunnerResult.java | 11 +-- .../beam/runners/flink/TestFlinkRunner.java | 9 ++- 4 files changed, 91 insertions(+), 14 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-593] avoid throwing Exception in waitUntilFinish
Repository: incubator-beam Updated Branches: refs/heads/master a2c342cfd -> 6b5ff4c4a [BEAM-593] avoid throwing Exception in waitUntilFinish The current implementation of Flink's PipelineResult assumes that the pipeline has already been processed. Hence, we can return State.Done when wailUntilFinished is called. Additionally, we introduce a PipelineResult for detached execution which returns State.UNKNOWN for now. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/76434dff Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/76434dff Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/76434dff Branch: refs/heads/master Commit: 76434dff650196c74afdeac917d8ceddb2550079 Parents: a2c342c Author: Maximilian MichelsAuthored: Thu Oct 13 14:01:06 2016 +0200 Committer: Maximilian Michels Committed: Tue Oct 18 16:59:49 2016 +0200 -- .../flink/FlinkDetachedRunnerResult.java| 76 .../apache/beam/runners/flink/FlinkRunner.java | 9 ++- .../beam/runners/flink/FlinkRunnerResult.java | 11 +-- .../beam/runners/flink/TestFlinkRunner.java | 9 ++- 4 files changed, 91 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76434dff/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java new file mode 100644 index 000..6adcf07 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.io.IOException; + +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.commons.lang.NotImplementedException; +import org.joda.time.Duration; + + +/** + * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink. + * In detached execution, results and job execution are currently unavailable. + */ +public class FlinkDetachedRunnerResult implements PipelineResult { + + FlinkDetachedRunnerResult() {} + + @Override + public State getState() { +return State.UNKNOWN; + } + + @Override + public AggregatorValues getAggregatorValues(final Aggregator aggregator) + throws AggregatorRetrievalException { +throw new AggregatorRetrievalException( +"Accumulators can't be retrieved for detached Job executions.", +new NotImplementedException()); + } + + @Override + public MetricResults metrics() { +throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics."); + } + + @Override + public State cancel() throws IOException { +throw new UnsupportedOperationException("Cancelling is not yet supported."); + } + + @Override + public State waitUntilFinish() { +return State.UNKNOWN; + } + + @Override + public State waitUntilFinish(Duration duration) { +return State.UNKNOWN; + } + + @Override + public String toString() { +return "FlinkDetachedRunnerResult{}"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76434dff/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 932952d..12e21c7 100644 ---
[GitHub] incubator-beam pull request #1093: [BEAM-593] avoid throwing Exception in wa...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/1093 [BEAM-593] avoid throwing Exception in waitUntilFinish The current implementation of Flink's `PipelineResult` assumes that the pipeline has already been processed. Hence, we can return State.Done when `wailUntilFinish()` is called. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-593 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1093.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1093 commit c4b78841a82951b5f7c3e1c7763e078317bb3a2f Author: Maximilian Michels <m...@apache.org> Date: 2016-10-13T12:01:06Z [BEAM-593] avoid throwing Exception in waitUntilFinish The current implementation of Flink's PipelineResult assumes that the pipeline has already been processed. Hence, we can return State.Done when wailUntilFinished is called. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Fix inconsistent in formatting logs: leaveCompositeTransform always decrement depth, but enterCompositeTransform increment depth only on ENTER_TRANSFORM
Repository: incubator-beam Updated Branches: refs/heads/master 13b45895e -> 73226168a Fix inconsistent in formatting logs: leaveCompositeTransform always decrement depth, but enterCompositeTransform increment depth only on ENTER_TRANSFORM Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cea201ea Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cea201ea Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cea201ea Branch: refs/heads/master Commit: cea201eaaea24d8cc1e117645d1c81f379beeb41 Parents: 98da6e8 Author: Alexey DiominAuthored: Wed Aug 31 18:17:01 2016 +0400 Committer: Alexey Diomin Committed: Wed Aug 31 18:17:54 2016 +0400 -- .../runners/flink/translation/FlinkBatchPipelineTranslator.java| 2 +- .../flink/translation/FlinkStreamingPipelineTranslator.java| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cea201ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java index 66c48b0..1cb604f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java @@ -65,6 +65,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { @Override public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node)); +this.depth++; BatchTransformTranslator translator = getTranslator(node); @@ -73,7 +74,6 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { LOG.info(genSpaces(this.depth) + "translated-" + formatNodeName(node)); return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; } else { - this.depth++; return CompositeBehavior.ENTER_TRANSFORM; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cea201ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java index 284cd23..e5c0d76 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java @@ -52,6 +52,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { @Override public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node)); +this.depth++; PTransform transform = node.getTransform(); if (transform != null) { @@ -64,7 +65,6 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; } } -this.depth++; return CompositeBehavior.ENTER_TRANSFORM; }
[2/2] incubator-beam git commit: This closes #908
This closes #908 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/73226168 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/73226168 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/73226168 Branch: refs/heads/master Commit: 73226168a436f88bf650e36b45434c2dbe399ae2 Parents: 13b4589 cea201e Author: Maximilian MichelsAuthored: Thu Oct 13 10:43:30 2016 +0200 Committer: Maximilian Michels Committed: Thu Oct 13 10:43:30 2016 +0200 -- .../runners/flink/translation/FlinkBatchPipelineTranslator.java| 2 +- .../flink/translation/FlinkStreamingPipelineTranslator.java| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/73226168/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java --
[2/2] incubator-beam git commit: This closes #1021
This closes #1021 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1ac Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1ac Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1ac Branch: refs/heads/master Commit: a1acdca13f6902faad30f37a877c1e6fb218 Parents: b5853a6 59f6231 Author: Maximilian MichelsAuthored: Wed Sep 28 18:46:58 2016 +0200 Committer: Maximilian Michels Committed: Wed Sep 28 18:46:58 2016 +0200 -- .../wrappers/streaming/WindowDoFnOperator.java | 179 +-- 1 file changed, 165 insertions(+), 14 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-615] Add Support for Processing-Time Timers in FlinkRunner Window Operator
Repository: incubator-beam Updated Branches: refs/heads/master b5853a624 -> a1acd [BEAM-615] Add Support for Processing-Time Timers in FlinkRunner Window Operator Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59f62318 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59f62318 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59f62318 Branch: refs/heads/master Commit: 59f623189184b225723ebd5686d912aa296ce35b Parents: 3879db0 Author: Aljoscha KrettekAuthored: Wed Sep 28 11:49:54 2016 +0200 Committer: Maximilian Michels Committed: Wed Sep 28 18:46:30 2016 +0200 -- .../wrappers/streaming/WindowDoFnOperator.java | 179 +-- 1 file changed, 165 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59f62318/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 14a3ca7..e06a783 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -26,12 +29,14 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ScheduledFuture; import javax.annotation.Nullable; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.SystemReduceFn; @@ -53,12 +58,15 @@ import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; + + import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.joda.time.Instant; @@ -69,7 +77,8 @@ import org.joda.time.Instant; * @param */ public class WindowDoFnOperator -extends DoFnOperator , KV , WindowedValue >> { +extends DoFnOperator , KV , WindowedValue >> +implements Triggerable { private final Coder keyCoder; private final TimerInternals.TimerDataCoder timerCoder; @@ -77,6 +86,11 @@ public class WindowDoFnOperator private transient Set > watermarkTimers; private transient Queue > watermarkTimersQueue; + private transient Queue > processingTimeTimersQueue; + private transient Set > processingTimeTimers; + private transient Multiset processingTimeTimerTimestamps; + private transient Map processingTimeTimerFutures; + private FlinkStateInternals stateInternals; private final SystemReduceFn systemReduceFn; @@ -151,6 +165,24 @@ public class WindowDoFnOperator }); } +if (processingTimeTimers == null) { + processingTimeTimers = new HashSet<>(); + processingTimeTimerTimestamps = HashMultiset.create(); + processingTimeTimersQueue = new PriorityQueue<>( + 10, + new Comparator >() { +@Override +public int compare( +
[3/3] incubator-beam git commit: This closes #967
This closes #967 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3879db03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3879db03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3879db03 Branch: refs/heads/master Commit: 3879db03657dd9331977313d5f3ab30d5f163b60 Parents: db47c63 f3f2a97 Author: Maximilian MichelsAuthored: Tue Sep 27 11:15:55 2016 +0200 Committer: Maximilian Michels Committed: Wed Sep 28 11:16:37 2016 +0200 -- .../streaming/io/UnboundedSourceWrapper.java| 96 .../streaming/UnboundedSourceWrapperTest.java | 10 ++ 2 files changed, 88 insertions(+), 18 deletions(-) --
[2/3] incubator-beam git commit: fix potential NPE in checkpointing of UnboundedSourceWrapper
fix potential NPE in checkpointing of UnboundedSourceWrapper This moves all the initialization code to the open() method which ensures that no snapshot can occur before the state has been initialized correctly. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f3f2a977 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f3f2a977 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f3f2a977 Branch: refs/heads/master Commit: f3f2a9779a5c355a5902a783f3e72609ff71717f Parents: cf14e80 Author: Maximilian MichelsAuthored: Fri Sep 16 18:42:43 2016 +0200 Committer: Maximilian Michels Committed: Wed Sep 28 11:14:21 2016 +0200 -- .../streaming/io/UnboundedSourceWrapper.java| 39 .../streaming/UnboundedSourceWrapperTest.java | 3 ++ 2 files changed, 27 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3f2a977/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 64cf703..68a83e8 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -91,6 +91,7 @@ public class UnboundedSourceWrapper< private transient List localReaders; /** + * Flag to indicate whether the source is running. * Initialize here and not in run() to prevent races where we cancel a job before run() is * ever called or run() is called after cancel(). */ @@ -154,19 +155,17 @@ public class UnboundedSourceWrapper< splitSources = source.generateInitialSplits(parallelism, pipelineOptions); } - @Override - public void run(SourceContext ctx) throws Exception { -if (!(ctx instanceof StreamSource.ManualWatermarkContext)) { - throw new RuntimeException( - "Cannot emit watermarks, this hints at a misconfiguration/bug."); -} -context = (StreamSource.ManualWatermarkContext ) ctx; + /** + * Initialize and restore state before starting execution of the source. + */ + @Override + public void open(Configuration parameters) throws Exception { runtimeContext = (StreamingRuntimeContext) getRuntimeContext(); // figure out which split sources we're responsible for -int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); -int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); +int subtaskIndex = runtimeContext.getIndexOfThisSubtask(); +int numSubtasks = runtimeContext.getNumberOfParallelSubtasks(); localSplitSources = new ArrayList<>(); localReaders = new ArrayList<>(); @@ -183,12 +182,12 @@ public class UnboundedSourceWrapper< new Function< KV, CheckpointMarkT>, UnboundedSource >() { -@Override -public UnboundedSource apply( -KV, CheckpointMarkT> input) { - return input.getKey(); -} - }); +@Override +public UnboundedSource apply( +KV, CheckpointMarkT> input) { + return input.getKey(); +} + }); for (KV, CheckpointMarkT> restored: restoredState) { @@ -215,6 +214,16 @@ public class UnboundedSourceWrapper< subtaskIndex, numSubtasks, localSplitSources); + } + + @Override + public void run(SourceContext ctx) throws Exception { +if (!(ctx instanceof StreamSource.ManualWatermarkContext)) { + throw new RuntimeException( + "Cannot emit watermarks, this hints at a misconfiguration/bug."); +} + +context = (StreamSource.ManualWatermarkContext ) ctx; if (localReaders.size() == 0) { // do nothing, but still look busy ... http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3f2a977/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java -- diff --git
[1/3] incubator-beam git commit: [BEAM-283] finalize CheckpointMarks upon completed checkpoint
Repository: incubator-beam Updated Branches: refs/heads/master db47c63ab -> 3879db036 [BEAM-283] finalize CheckpointMarks upon completed checkpoint Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cf14e809 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cf14e809 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cf14e809 Branch: refs/heads/master Commit: cf14e809d4a790c407ab7c3cf1c90bb436a86dc9 Parents: c403675 Author: Maximilian MichelsAuthored: Fri Sep 16 17:04:22 2016 +0200 Committer: Maximilian Michels Committed: Fri Sep 16 20:48:52 2016 +0200 -- .../streaming/io/UnboundedSourceWrapper.java| 57 ++-- .../streaming/UnboundedSourceWrapperTest.java | 7 +++ 2 files changed, 61 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cf14e809/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 7fdc816..64cf703 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -22,6 +22,8 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; import java.io.ByteArrayInputStream; import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.coders.Coder; @@ -38,6 +40,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; @@ -54,7 +57,7 @@ import org.slf4j.LoggerFactory; public class UnboundedSourceWrapper< OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends RichParallelSourceFunction -implements Triggerable, StoppableFunction, Checkpointed { +implements Triggerable, StoppableFunction, Checkpointed , CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class); @@ -106,6 +109,15 @@ public class UnboundedSourceWrapper< private transient StreamSource.ManualWatermarkContext context; /** + * Pending checkpoints which have not been acknowledged yet. + */ + private transient LinkedHashMap pendingCheckpoints; + /** + * Keep a maximum of 32 checkpoints for {@code CheckpointMark.finalizeCheckpoint()}. + */ + private static final int MAX_NUMBER_PENDING_CHECKPOINTS = 32; + + /** * When restoring from a snapshot we put the restored sources/checkpoint marks here * and open in {@link #open(Configuration)}. */ @@ -159,6 +171,8 @@ public class UnboundedSourceWrapper< localSplitSources = new ArrayList<>(); localReaders = new ArrayList<>(); +pendingCheckpoints = new LinkedHashMap<>(); + if (restoredState != null) { // restore the splitSources from the checkpoint to ensure consistent ordering @@ -324,7 +338,7 @@ public class UnboundedSourceWrapper< } @Override - public byte[] snapshotState(long l, long l1) throws Exception { + public byte[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { if (checkpointCoder == null) { // no checkpoint coder available in this source @@ -335,7 +349,8 @@ public class UnboundedSourceWrapper< // than we have a correct mapping of checkpoints to sources when // restoring List > checkpoints = -new ArrayList<>(); +new ArrayList<>(localSplitSources.size()); +List checkpointMarks = new ArrayList<>(localSplitSources.size()); for (int i = 0; i < localSplitSources.size(); i++) { UnboundedSource source = localSplitSources.get(i); @@
[1/2] incubator-beam git commit: [BEAM-642] Support Flink Detached Mode for JOB execution
Repository: incubator-beam Updated Branches: refs/heads/master f62d04e22 -> 843275210 [BEAM-642] Support Flink Detached Mode for JOB execution Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc69bc48 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc69bc48 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc69bc48 Branch: refs/heads/master Commit: dc69bc48b0057f45d849d3cfec848fa066ee0854 Parents: f62d04e Author: Sumit ChawlaAuthored: Mon Sep 19 15:10:53 2016 -0700 Committer: Maximilian Michels Committed: Thu Sep 22 11:30:09 2016 +0200 -- .../apache/beam/runners/flink/FlinkRunner.java | 25 +--- 1 file changed, 16 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc69bc48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index d3c65c0..137fdeb 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -25,6 +25,7 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -56,6 +57,7 @@ import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.client.program.DetachedEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,18 +153,23 @@ public class FlinkRunner extends PipelineRunner { throw new RuntimeException("Pipeline execution failed", e); } -LOG.info("Execution finished in {} msecs", result.getNetRuntime()); - -Map accumulators = result.getAllAccumulatorResults(); -if (accumulators != null && !accumulators.isEmpty()) { - LOG.info("Final aggregator values:"); +if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) { + LOG.info("Pipeline submitted in Detached mode"); + Map accumulators = Collections.emptyMap(); + return new FlinkRunnerResult(accumulators, -1L); +} else { + LOG.info("Execution finished in {} msecs", result.getNetRuntime()); + Map accumulators = result.getAllAccumulatorResults(); + if (accumulators != null && !accumulators.isEmpty()) { +LOG.info("Final aggregator values:"); - for (Map.Entry entry : result.getAllAccumulatorResults().entrySet()) { -LOG.info("{} : {}", entry.getKey(), entry.getValue()); +for (Map.Entry entry : result.getAllAccumulatorResults().entrySet()) { + LOG.info("{} : {}", entry.getKey(), entry.getValue()); +} } -} -return new FlinkRunnerResult(accumulators, result.getNetRuntime()); + return new FlinkRunnerResult(accumulators, result.getNetRuntime()); +} } /**
[GitHub] incubator-beam pull request #967: [BEAM-283] finalize CheckpointMarks upon c...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/967 [BEAM-283] finalize CheckpointMarks upon completed checkpoint See the first commit for an addition to the `UnboundedSourceWrapper` to call `finalizeCheckpoint()` on `CheckpointMark`s upon checkpoint completion. The second commits contains a potential race condition upon startup of the source wrapper when checkpointing is enabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-283 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/967.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #967 commit cf14e809d4a790c407ab7c3cf1c90bb436a86dc9 Author: Maximilian Michels <m...@apache.org> Date: 2016-09-16T15:04:22Z [BEAM-283] finalize CheckpointMarks upon completed checkpoint commit b82208b22adab641f3c469bef622d0a76be88d68 Author: Maximilian Michels <m...@apache.org> Date: 2016-09-16T16:42:43Z fix potential NPE in checkpointing of UnboundedSourceWrapper This moves all the initialization code to the open() method which ensures that no snapshot can occur before the state has been initialized correctly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #929
This closes #929 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e9326c8b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e9326c8b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e9326c8b Branch: refs/heads/master Commit: e9326c8b19c74a070b8ce8612af25b79dfb537ab Parents: b6205ff de6ec82 Author: Maximilian MichelsAuthored: Fri Sep 9 16:17:03 2016 +0200 Committer: Maximilian Michels Committed: Fri Sep 9 16:17:03 2016 +0200 -- .../wrappers/streaming/WindowDoFnOperator.java | 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9326c8b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java --
[1/2] incubator-beam git commit: [flink] initialize watermarkTimeQueue with Comparator
Repository: incubator-beam Updated Branches: refs/heads/master b6205ffa3 -> e9326c8b1 [flink] initialize watermarkTimeQueue with Comparator Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de6ec823 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de6ec823 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de6ec823 Branch: refs/heads/master Commit: de6ec8238f16f7505eb17ffa293208dabfa3431a Parents: 26635d7 Author: Maximilian MichelsAuthored: Wed Sep 7 16:49:38 2016 +0200 Committer: Maximilian Michels Committed: Wed Sep 7 16:51:24 2016 +0200 -- .../wrappers/streaming/WindowDoFnOperator.java | 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de6ec823/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 29ae6ae..075f5df 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -265,7 +265,17 @@ public class WindowDoFnOperator int numWatermarkTimers = dataIn.readInt(); watermarkTimers = new HashSet<>(numWatermarkTimers); -watermarkTimersQueue = new PriorityQueue<>(Math.max(numWatermarkTimers, 1)); + +watermarkTimersQueue = new PriorityQueue<>( +Math.max(numWatermarkTimers, 1), +new Comparator >() { + @Override + public int compare( + Tuple2 o1, + Tuple2 o2) { +return o1.f1.compareTo(o2.f1); + } +}); for (int i = 0; i < numWatermarkTimers; i++) { int length = dataIn.readInt();
[2/5] incubator-beam git commit: [BEAM-333][flink] make bounded/unbounded sources stoppable
[BEAM-333][flink] make bounded/unbounded sources stoppable Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e2820b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e2820b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e2820b0 Branch: refs/heads/master Commit: 7e2820b06c19d958cbf7316ae28def7fe796a360 Parents: be689df Author: Maximilian MichelsAuthored: Tue Sep 6 16:38:43 2016 +0200 Committer: Maximilian Michels Committed: Fri Sep 9 16:06:42 2016 +0200 -- .../wrappers/streaming/io/BoundedSourceWrapper.java | 9 - .../wrappers/streaming/io/UnboundedSourceWrapper.java | 8 +++- 2 files changed, 15 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e2820b0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java index 3cb93c0..df49a49 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; @@ -37,7 +38,8 @@ import org.slf4j.LoggerFactory; * Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink Source. */ public class BoundedSourceWrapper -extends RichParallelSourceFunction { +extends RichParallelSourceFunction +implements StoppableFunction { private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class); @@ -206,6 +208,11 @@ public class BoundedSourceWrapper isRunning = false; } + @Override + public void stop() { +this.isRunning = false; + } + /** * Visible so that we can check this in tests. Must not be used for anything else. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e2820b0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 8647322..debf52f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory; public class UnboundedSourceWrapper< OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends RichParallelSourceFunction -implements Triggerable, Checkpointed { +implements Triggerable, StoppableFunction, Checkpointed { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class); @@ -311,6 +312,11 @@ public class UnboundedSourceWrapper< } @Override + public void stop() { +isRunning = false; + } + + @Override public byte[] snapshotState(long l, long l1) throws Exception { if (checkpointCoder == null) {
[1/5] incubator-beam git commit: [BEAM-619] keep track of local split sources in UnboundedSourceWrapper
Repository: incubator-beam Updated Branches: refs/heads/master a96ea98a4 -> b6205ffa3 [BEAM-619] keep track of local split sources in UnboundedSourceWrapper Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/145ad47d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/145ad47d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/145ad47d Branch: refs/heads/master Commit: 145ad47d9f945f816be7a91001cdf7cb3b6a7fac Parents: be689df Author: Maximilian MichelsAuthored: Wed Sep 7 13:07:15 2016 +0200 Committer: Maximilian Michels Committed: Wed Sep 7 13:15:54 2016 +0200 -- .../streaming/io/UnboundedSourceWrapper.java| 79 +++- 1 file changed, 43 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/145ad47d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 8647322..2cd06ed 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -58,7 +58,7 @@ public class UnboundedSourceWrapper< private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class); /** - * Keep the options so that we can initialize the readers. + * Keep the options so that we can initialize the localReaders. */ private final SerializedPipelineOptions serializedOptions; @@ -72,13 +72,19 @@ public class UnboundedSourceWrapper< * The split sources. We split them in the constructor to ensure that all parallel * sources are consistent about the split sources. */ - private List> splitSources; + private final List> splitSources; /** + * The local split sources. Assigned at runtime when the wrapper is executed in parallel. + */ + private transient List > localSplitSources; + + /** + * The local split readers. Assigned at runtime when the wrapper is executed in parallel. * Make it a field so that we can access it in {@link #trigger(long)} for * emitting watermarks. */ - private transient List readers; + private transient List localReaders; /** * Initialize here and not in run() to prevent races where we cancel a job before run() is @@ -149,26 +155,15 @@ public class UnboundedSourceWrapper< int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); -List > localSources = new ArrayList<>(); - -for (int i = 0; i < splitSources.size(); i++) { - if (i % numSubtasks == subtaskIndex) { -localSources.add(splitSources.get(i)); - } -} +localSplitSources = new ArrayList<>(); +localReaders = new ArrayList<>(); -LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}", -subtaskIndex, -numSubtasks, -localSources); - -readers = new ArrayList<>(); if (restoredState != null) { // restore the splitSources from the checkpoint to ensure consistent ordering // do it using a transform because otherwise we would have to do // unchecked casts - splitSources = Lists.transform( + localSplitSources = Lists.transform( restoredState, new Function< KV, CheckpointMarkT>, @@ -182,19 +177,31 @@ public class UnboundedSourceWrapper< for (KV, CheckpointMarkT> restored: restoredState) { -readers.add( +localReaders.add( restored.getKey().createReader( serializedOptions.getPipelineOptions(), restored.getValue())); } restoredState = null; } else { - // initialize readers from scratch - for (UnboundedSource source : localSources) { - readers.add(source.createReader(serializedOptions.getPipelineOptions(), null)); + // initialize localReaders and localSources from scratch + for (int i = 0; i < splitSources.size(); i++) { +if (i % numSubtasks == subtaskIndex) { +
[5/5] incubator-beam git commit: This closes #927
This closes #927 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b6205ffa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b6205ffa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b6205ffa Branch: refs/heads/master Commit: b6205ffa309af4e21ea2f63a211caae4961b81b1 Parents: c78db9a 4afd25a Author: Maximilian MichelsAuthored: Fri Sep 9 16:10:55 2016 +0200 Committer: Maximilian Michels Committed: Fri Sep 9 16:10:55 2016 +0200 -- .../streaming/io/UnboundedSourceWrapper.java| 87 -- .../streaming/UnboundedSourceWrapperTest.java | 113 +++ 2 files changed, 93 insertions(+), 107 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6205ffa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java --
[4/5] incubator-beam git commit: [BEAM-619] extend test case to be parameterized
[BEAM-619] extend test case to be parameterized - extend test case with number of tasks and splits parameters Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4afd25a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4afd25a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4afd25a7 Branch: refs/heads/master Commit: 4afd25a7a85a24ff0212a4791661d3c5e105662b Parents: 145ad47 Author: Maximilian MichelsAuthored: Wed Sep 7 14:23:12 2016 +0200 Committer: Maximilian Michels Committed: Fri Sep 9 16:09:44 2016 +0200 -- .../streaming/io/UnboundedSourceWrapper.java| 8 ++ .../streaming/UnboundedSourceWrapperTest.java | 113 +++ 2 files changed, 50 insertions(+), 71 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4afd25a7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 2cd06ed..a62a754 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -397,4 +397,12 @@ public class UnboundedSourceWrapper< public List> getSplitSources() { return splitSources; } + + /** + * Visible so that we can check this in tests. Must not be used for anything else. + */ + @VisibleForTesting + public List> getLocalSplitSources() { +return localSplitSources; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4afd25a7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java -- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index 73124a9..0cc584e 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -23,6 +23,8 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -44,78 +46,43 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Tests for {@link UnboundedSourceWrapper}. */ +@RunWith(Parameterized.class) public class UnboundedSourceWrapperTest { - /** - * Creates a {@link UnboundedSourceWrapper} that has exactly one reader per source, since we - * specify a parallelism of 1 and also at runtime tell the source that it has 1 parallel subtask. - */ - @Test - public void testWithOneReader() throws Exception { -final int numElements = 20; -final Object checkpointLock = new Object(); -PipelineOptions options = PipelineOptionsFactory.create(); - -// this source will emit exactly NUM_ELEMENTS across all parallel readers, -// afterwards it will stall. We check whether we also receive NUM_ELEMENTS -// elements later. -TestCountingSource source = new TestCountingSource(numElements); -UnboundedSourceWrapper , TestCountingSource.CounterMark> flinkWrapper = -new UnboundedSourceWrapper<>(options, source, 1); - -assertEquals(1, flinkWrapper.getSplitSources().size()); - -StreamSource< -WindowedValue >, -UnboundedSourceWrapper< -KV , -TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); - -setupSourceOperator(sourceOperator); - - -try { - sourceOperator.run(checkpointLock, - new Output
[3/5] incubator-beam git commit: This closes #924
This closes #924 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c78db9ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c78db9ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c78db9ad Branch: refs/heads/master Commit: c78db9addf0b08b1b4a3ca4ec5e3e7f3a0899a02 Parents: a96ea98 7e2820b Author: Maximilian MichelsAuthored: Fri Sep 9 16:07:57 2016 +0200 Committer: Maximilian Michels Committed: Fri Sep 9 16:07:57 2016 +0200 -- .../wrappers/streaming/io/BoundedSourceWrapper.java | 9 - .../wrappers/streaming/io/UnboundedSourceWrapper.java | 8 +++- 2 files changed, 15 insertions(+), 2 deletions(-) --
[2/4] incubator-beam git commit: [BEAM-617][flink] introduce option to set state backend
[BEAM-617][flink] introduce option to set state backend Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d4f85912 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d4f85912 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d4f85912 Branch: refs/heads/master Commit: d4f85912effd2c04cac99d693a87bf6e2d597e9c Parents: be689df Author: Maximilian MichelsAuthored: Tue Sep 6 16:25:32 2016 +0200 Committer: Maximilian Michels Committed: Fri Sep 9 15:59:48 2016 +0200 -- .../runners/flink/FlinkPipelineExecutionEnvironment.java | 7 +++ .../apache/beam/runners/flink/FlinkPipelineOptions.java | 11 +++ 2 files changed, 18 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4f85912/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index a5d33b4..391c3f2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.CollectionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.slf4j.Logger; @@ -221,6 +222,12 @@ public class FlinkPipelineExecutionEnvironment { flinkStreamEnv.enableCheckpointing(checkpointInterval); } +// State backend +final AbstractStateBackend stateBackend = options.getStateBackend(); +if (stateBackend != null) { + flinkStreamEnv.setStateBackend(stateBackend); +} + return flinkStreamEnv; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4f85912/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 1fb23ec..a067e76 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; /** * Options which can be used to configure a Flink PipelineRunner. @@ -82,4 +83,14 @@ public interface FlinkPipelineOptions Long getExecutionRetryDelay(); void setExecutionRetryDelay(Long delay); + /** + * Sets a state backend to store Beam's state during computation. + * Note: Only applicable when executing in streaming mode. + * @param stateBackend The state backend to use + */ + @Description("Sets the state backend to use in streaming mode. " + + "Otherwise the default is read from the Flink config.") + void setStateBackend(AbstractStateBackend stateBackend); + AbstractStateBackend getStateBackend(); + }
[1/4] incubator-beam git commit: [flink] use exploded WindowValue in FlinkDoFnFunction
Repository: incubator-beam Updated Branches: refs/heads/master 817515fe4 -> a96ea98a4 [flink] use exploded WindowValue in FlinkDoFnFunction Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3461ce21 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3461ce21 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3461ce21 Branch: refs/heads/master Commit: 3461ce21b8b88de18154de777e21dc7af889f2c7 Parents: 26635d7 Author: Maximilian MichelsAuthored: Wed Sep 7 14:49:02 2016 +0200 Committer: Maximilian Michels Committed: Wed Sep 7 14:49:02 2016 +0200 -- .../runners/flink/translation/functions/FlinkDoFnFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3461ce21/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index ac5b345..798a23c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -86,7 +86,7 @@ public class FlinkDoFnFunction // is in only one window for (WindowedValue value : values) { for (WindowedValue explodedValue : value.explodeWindows()) { - context = context.forWindowedValue(value); + context = context.forWindowedValue(explodedValue); doFn.processElement(context); } }
[3/4] incubator-beam git commit: This closes #923
This closes #923 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0399dbc7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0399dbc7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0399dbc7 Branch: refs/heads/master Commit: 0399dbc7a843e95ceacf9eff9fc751751f8f4bcc Parents: 817515f d4f8591 Author: Maximilian MichelsAuthored: Fri Sep 9 16:00:33 2016 +0200 Committer: Maximilian Michels Committed: Fri Sep 9 16:00:33 2016 +0200 -- .../runners/flink/FlinkPipelineExecutionEnvironment.java | 7 +++ .../apache/beam/runners/flink/FlinkPipelineOptions.java | 11 +++ 2 files changed, 18 insertions(+) --
[4/4] incubator-beam git commit: This closes #928
This closes #928 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a96ea98a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a96ea98a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a96ea98a Branch: refs/heads/master Commit: a96ea98a48c2fc7e95bdb6265ccf421355584c4d Parents: 0399dbc 3461ce2 Author: Maximilian MichelsAuthored: Fri Sep 9 16:01:29 2016 +0200 Committer: Maximilian Michels Committed: Fri Sep 9 16:01:29 2016 +0200 -- .../runners/flink/translation/functions/FlinkDoFnFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/3] incubator-beam-site git commit: [BEAM-102] update capability matrix
Repository: incubator-beam-site Updated Branches: refs/heads/asf-site e2430eb4d -> dcdd8b742 [BEAM-102] update capability matrix Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/8459da13 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/8459da13 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/8459da13 Branch: refs/heads/asf-site Commit: 8459da13fccbd16e850ea455873812eea974b6dc Parents: e2430eb Author: Maximilian MichelsAuthored: Mon Sep 5 13:04:49 2016 +0200 Committer: Maximilian Michels Committed: Thu Sep 8 17:24:27 2016 +0200 -- _data/capability-matrix.yml | 9 - 1 file changed, 4 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/8459da13/_data/capability-matrix.yml -- diff --git a/_data/capability-matrix.yml b/_data/capability-matrix.yml index e2f66b9..89da73b 100644 --- a/_data/capability-matrix.yml +++ b/_data/capability-matrix.yml @@ -118,12 +118,11 @@ categories: - class: dataflow l1: 'Yes' l2: some size restrictions in streaming -l3: Batch implemented supports a distributed implementation, but streaming mode may force some size restrictions. Neither mode is able to push lookups directly up into key-based sources. +l3: Batch mode supports a distributed implementation, but streaming mode may force some size restrictions. Neither mode is able to push lookups directly up into key-based sources. - class: flink -jira: BEAM-102 -l1: 'Partially' -l2: no supported in streaming -l3: Supported in batch. Side inputs for streaming are currently WiP. +l1: 'Yes' +l2: some size restrictions in streaming +l3: Batch mode supports a distributed implementation, but streaming mode may force some size restrictions. Neither mode is able to push lookups directly up into key-based sources. - class: spark l1: 'Partially' l2: not supported in streaming
[3/3] incubator-beam-site git commit: This closes #39
This closes #39 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/dcdd8b74 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/dcdd8b74 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/dcdd8b74 Branch: refs/heads/asf-site Commit: dcdd8b742bd5f1463815638ef1c33aca3a523308 Parents: e2430eb bb1106b Author: Maximilian MichelsAuthored: Thu Sep 8 17:24:51 2016 +0200 Committer: Maximilian Michels Committed: Thu Sep 8 17:24:51 2016 +0200 -- _data/capability-matrix.yml| 9 - content/learn/runners/capability-matrix/index.html | 8 2 files changed, 8 insertions(+), 9 deletions(-) --
[GitHub] incubator-beam pull request #929: [flink] initialize watermarkTimeQueue with...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/929 [flink] initialize watermarkTimeQueue with Comparator This mitigates a ClassCastException with Comparable. CC @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam fix2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/929.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #929 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #928: [flink] use exploded WindowValue in FlinkD...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/928 [flink] use exploded WindowValue in FlinkDoFnFunction CC @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/928.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #928 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #924: [BEAM-333][flink] make unbounded sources s...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/924 [BEAM-333][flink] make unbounded sources stoppable You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-333 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/924.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #924 commit 557f1fe909047a72492faa67c6f7d2d24c3cf729 Author: Maximilian Michels <m...@apache.org> Date: 2016-09-06T14:38:43Z [BEAM-333][flink] make unbounded sources stoppable --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #923: [BEAM-617][flink] introduce option to set ...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/923 [BEAM-617][flink] introduce option to set state backend CC @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-617 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/923.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #923 commit 2eba5034e7d5abbae737025a125805af7744aefd Author: Maximilian Michels <m...@apache.org> Date: 2016-09-06T14:25:32Z [BEAM-617][flink] introduce option to set state backend --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam-site pull request #39: [BEAM-102] update capability matrix
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam-site/pull/39 [BEAM-102] update capability matrix This updates the matrix to the most recent development status. CC @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam-site asf-site Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam-site/pull/39.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #39 commit a36d8c54c32445485cc01d69d9e75cc76a227ebe Author: Maximilian Michels <m...@apache.org> Date: 2016-09-05T11:04:49Z [BEAM-102] update capability matrix commit 3afa3793f134d94e5d19bea650b62a0b616d5933 Author: Maximilian Michels <m...@apache.org> Date: 2016-09-05T11:05:49Z build website --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #821: [flink] add missing maven config to exampl...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/821 [flink] add missing maven config to example pom You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/821.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #821 commit e21b1c594f4307b2bc5e615d40e1d67f209c527b Author: Maximilian Michels <m...@apache.org> Date: 2016-08-12T15:51:02Z [flink] add missing maven config to example pom --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: [flink] improve example section in README
Repository: incubator-beam Updated Branches: refs/heads/master 267136fb6 -> d02d2de09 [flink] improve example section in README - updates the README - repairs broken exec configuration Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2fe38770 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2fe38770 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2fe38770 Branch: refs/heads/master Commit: 2fe387707d1e115b578f5ee643bb99c0e4667ee0 Parents: cf14644 Author: Maximilian MichelsAuthored: Wed Jul 20 16:06:06 2016 +0200 Committer: Maximilian Michels Committed: Mon Jul 25 17:30:19 2016 +0200 -- runners/flink/README.md | 25 runners/flink/examples/pom.xml | 11 - .../beam/runners/flink/examples/WordCount.java | 4 ++-- 3 files changed, 21 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe38770/runners/flink/README.md -- diff --git a/runners/flink/README.md b/runners/flink/README.md index 3348119..aeb1692 100644 --- a/runners/flink/README.md +++ b/runners/flink/README.md @@ -109,35 +109,40 @@ Next, let's run the classic WordCount example. It's semantically identically to the example provided with Apache Beam. Only this time, we chose the `FlinkRunner` to execute the WordCount on top of Flink. -Here's an excerpt from the WordCount class file: +Here's an excerpt from the [WordCount class file](examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java): ```java -Options options = PipelineOptionsFactory.fromArgs(args).as(Options.class); +Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + // yes, we want to run WordCount with Flink options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); -p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) - .apply(new CountWords()) - .apply(TextIO.Write.named("WriteCounts") - .to(options.getOutput()) - .withNumShards(options.getNumShards())); +p.apply("ReadLines", TextIO.Read.from(options.getInput())) +.apply(new CountWords()) +.apply(MapElements.via(new FormatAsTextFn())) +.apply("WriteCounts", TextIO.Write.to(options.getOutput())); p.run(); ``` To execute the example, let's first get some sample data: -curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > examples/kinglear.txt +cd runners/flink/examples +curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > kinglear.txt Then let's run the included WordCount locally on your machine: -cd examples -mvn exec:exec -Dinput=kinglear.txt -Doutput=wordcounts.txt +cd runners/flink/examples +mvn exec:java -Dexec.mainClass=org.apache.beam.runners.flink.examples.WordCount \ + -Dinput=kinglear.txt -Doutput=wordcounts.txt Congratulations, you have run your first Apache Beam program on top of Apache Flink! +Note, that you will find a number of `wordcounts*` output files because Flink parallelizes the +WordCount computation. You may pass an additional `-Dparallelism=1` to disable parallelization and +get a single `wordcounts.txt` file. # Running Beam programs on a Flink cluster http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe38770/runners/flink/examples/pom.xml -- diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml index b0ee2ed..355a6be 100644 --- a/runners/flink/examples/pom.xml +++ b/runners/flink/examples/pom.xml @@ -33,11 +33,10 @@ jar - -org.apache.beam.runners.flink.examples.WordCount + kinglear.txt wordcounts.txt -1 +-1 @@ -131,12 +130,10 @@ java --classpath - -${clazz} + --runner=org.apache.beam.runners.flink.FlinkRunner +--parallelism=${parallelism} --input=${input} --output=${output} ---parallelism=${parallelism} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe38770/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java -- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java index
[2/2] incubator-beam git commit: This closes #724
This closes #724 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d02d2de0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d02d2de0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d02d2de0 Branch: refs/heads/master Commit: d02d2de094dda4c8fcc30e1964b4e2c514e2f557 Parents: 267136f 2fe3877 Author: Maximilian MichelsAuthored: Tue Jul 26 12:03:29 2016 +0200 Committer: Maximilian Michels Committed: Tue Jul 26 12:04:34 2016 +0200 -- runners/flink/README.md | 25 runners/flink/examples/pom.xml | 11 - .../beam/runners/flink/examples/WordCount.java | 4 ++-- 3 files changed, 21 insertions(+), 19 deletions(-) --
[GitHub] incubator-beam pull request #724: [flink] improve example section in README
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/724 [flink] improve example section in README - updates the README - repairs broken exec configuration You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam README Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/724.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #724 commit 2f07d4e6d5b801860be804a808ec5c87fe067c5a Author: Maximilian Michels <m...@apache.org> Date: 2016-07-20T14:06:06Z [flink] improve example section in README - updates the README - repairs broken exec configuration --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #450
This closes #450 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/be05942d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/be05942d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/be05942d Branch: refs/heads/master Commit: be05942da0f09a247e195d1d29513ae40e1a95e0 Parents: 60964b6 a2abc6a Author: Maximilian MichelsAuthored: Mon Jun 13 14:57:31 2016 +0200 Committer: Maximilian Michels Committed: Mon Jun 13 14:57:31 2016 +0200 -- .../wrappers/streaming/FlinkAbstractParDoWrapper.java | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) --
[1/2] incubator-beam git commit: [flink] fix potential NPE in ParDoWrapper
Repository: incubator-beam Updated Branches: refs/heads/master 60964b611 -> be05942da [flink] fix potential NPE in ParDoWrapper Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a2abc6a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a2abc6a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a2abc6a2 Branch: refs/heads/master Commit: a2abc6a249cdc4e6000d1539df6c3b5cde8d39b0 Parents: 60964b6 Author: Maximilian MichelsAuthored: Fri Jun 10 14:26:45 2016 +0200 Committer: Maximilian Michels Committed: Mon Jun 13 14:56:54 2016 +0200 -- .../wrappers/streaming/FlinkAbstractParDoWrapper.java | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a2abc6a2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java index a935011..3c37aa9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -70,18 +70,21 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl @Override public void open(Configuration parameters) throws Exception { -this.doFn.startBundle(context); } @Override public void close() throws Exception { -this.doFn.finishBundle(context); +if (this.context != null) { + // we have initialized the context + this.doFn.finishBundle(this.context); +} } @Override public void flatMap(WindowedValue value, Collector out) throws Exception { if (this.context == null) { this.context = new DoFnProcessContext(doFn, out); + this.doFn.startBundle(this.context); } // for each window the element belongs to, create a new copy here. @@ -98,7 +101,7 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl private void processElement(WindowedValue value) throws Exception { this.context.setElement(value); -doFn.processElement(context); +doFn.processElement(this.context); } private class DoFnProcessContext extends DoFn .ProcessContext {
[GitHub] incubator-beam pull request #450: [flink] fix potential NPE in ParDoWrapper
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/450 [flink] fix potential NPE in ParDoWrapper Just discovered this while checking for correct execution of the bundle life cycle. This fixes potential NPEs in the ParDo translation wrapper. `startBundle(context)` receives `null` as `context` before the first element has been read. Similarly, `finishBundle(context)` receives `null` as `context` if no elements have been read. CC @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam pr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/450.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #450 commit ef9f4c0bb4aecde5a99cd80e96223de306a05455 Author: Maximilian Michels <m...@apache.org> Date: 2016-06-13T12:26:45Z [flink] fix potential NPE in ParDoWrapper --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/5] incubator-beam git commit: [BEAM-196] provide PipelineOptions in DoFn
Repository: incubator-beam Updated Branches: refs/heads/master ffbfc66e1 -> cc448e976 [BEAM-196] provide PipelineOptions in DoFn - fixes NPE when accessing the PipelineOptions - adds a test to verify that the PipelineOptions are available Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eced106e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eced106e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eced106e Branch: refs/heads/master Commit: eced106e50ddb257524a7826ab7d27254be89da8 Parents: d10ae23 Author: Maximilian MichelsAuthored: Tue Jun 7 13:57:33 2016 +0200 Committer: Maximilian Michels Committed: Wed Jun 8 15:19:50 2016 +0200 -- .../streaming/FlinkAbstractParDoWrapper.java| 11 ++- .../beam/runners/flink/PipelineOptionsTest.java | 97 +++- 2 files changed, 100 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eced106e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java index 117303c..a935011 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; @@ -37,6 +38,7 @@ import com.google.common.base.Preconditions; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; import org.joda.time.Instant; import org.joda.time.format.PeriodFormat; @@ -52,7 +54,7 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl private final DoFn doFn; private final WindowingStrategy windowingStrategy; - private transient PipelineOptions options; + private final SerializedPipelineOptions serializedPipelineOptions; private DoFnProcessContext context; @@ -62,7 +64,7 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl Preconditions.checkNotNull(doFn); this.doFn = doFn; -this.options = options; +this.serializedPipelineOptions = new SerializedPipelineOptions(options); this.windowingStrategy = windowingStrategy; } @@ -107,7 +109,8 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl private WindowedValue element; -private DoFnProcessContext(DoFn function, Collector outCollector) { +private DoFnProcessContext(DoFn function, + Collector outCollector) { function.super(); super.setupDelegateAggregators(); @@ -156,7 +159,7 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl @Override public PipelineOptions getPipelineOptions() { - return options; + return serializedPipelineOptions.getPipelineOptions(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eced106e/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java -- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 464c6df..d571f31 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -18,14 +18,29 @@ package org.apache.beam.runners.flink; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import
[4/5] incubator-beam git commit: [BEAM-287] adjust README to changed Maven layout
[BEAM-287] adjust README to changed Maven layout Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1fd0dfc7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1fd0dfc7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1fd0dfc7 Branch: refs/heads/master Commit: 1fd0dfc7ad61078259a5ad43b37d42b873d40090 Parents: ffbfc66 Author: Maximilian MichelsAuthored: Fri Jun 3 11:13:45 2016 +0200 Committer: Maximilian Michels Committed: Wed Jun 8 18:08:34 2016 +0200 -- runners/flink/README.md | 7 +-- 1 file changed, 1 insertion(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fd0dfc7/runners/flink/README.md -- diff --git a/runners/flink/README.md b/runners/flink/README.md index cce17a2..69e2abb 100644 --- a/runners/flink/README.md +++ b/runners/flink/README.md @@ -163,7 +163,7 @@ The contents of the root `pom.xml` should be slightly changed aftewards (explana org.apache.beam - flink-runner_2.10 + beam-runners-flink_2.10 0.2.0-incubating-SNAPSHOT @@ -186,11 +186,6 @@ The contents of the root `pom.xml` should be slightly changed aftewards (explana org.apache.beam.runners.flink.examples.WordCount - - - org.apache.flink:* - -
[2/5] incubator-beam git commit: [flink] improve lifecycle of ParDoBoundWrapper
[flink] improve lifecycle of ParDoBoundWrapper Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d10ae23c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d10ae23c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d10ae23c Branch: refs/heads/master Commit: d10ae23c9bc9529d04d02951bfed01bbf2957773 Parents: ffbfc66 Author: Maximilian MichelsAuthored: Mon Jun 6 12:40:50 2016 +0200 Committer: Maximilian Michels Committed: Wed Jun 8 15:19:50 2016 +0200 -- .../streaming/FlinkAbstractParDoWrapper.java | 18 +++--- 1 file changed, 11 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d10ae23c/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java index bb6ed67..117303c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -66,15 +66,21 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl this.windowingStrategy = windowingStrategy; } - private void initContext(DoFn function, Collector outCollector) { -if (this.context == null) { - this.context = new DoFnProcessContext(function, outCollector); -} + @Override + public void open(Configuration parameters) throws Exception { +this.doFn.startBundle(context); + } + + @Override + public void close() throws Exception { +this.doFn.finishBundle(context); } @Override public void flatMap(WindowedValue value, Collector out) throws Exception { -this.initContext(doFn, out); +if (this.context == null) { + this.context = new DoFnProcessContext(doFn, out); +} // for each window the element belongs to, create a new copy here. Collection windows = value.getWindows(); @@ -90,9 +96,7 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl private void processElement(WindowedValue value) throws Exception { this.context.setElement(value); -this.doFn.startBundle(context); doFn.processElement(context); -this.doFn.finishBundle(context); } private class DoFnProcessContext extends DoFn .ProcessContext {
[5/5] incubator-beam git commit: This closes #415
This closes #415 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc448e97 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc448e97 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc448e97 Branch: refs/heads/master Commit: cc448e976d46a8a9341445e152b3b31ea8968a56 Parents: f5583cf 1fd0dfc Author: Maximilian MichelsAuthored: Wed Jun 8 18:09:03 2016 +0200 Committer: Maximilian Michels Committed: Wed Jun 8 18:09:03 2016 +0200 -- runners/flink/README.md | 7 +-- 1 file changed, 1 insertion(+), 6 deletions(-) --
[3/5] incubator-beam git commit: This closes #432
This closes #432 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5583cfa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5583cfa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5583cfa Branch: refs/heads/master Commit: f5583cfad6c857d919995edd385adaf0f41fd676 Parents: ffbfc66 eced106 Author: Maximilian MichelsAuthored: Wed Jun 8 18:04:50 2016 +0200 Committer: Maximilian Michels Committed: Wed Jun 8 18:04:50 2016 +0200 -- .../streaming/FlinkAbstractParDoWrapper.java| 29 +++--- .../beam/runners/flink/PipelineOptionsTest.java | 97 +++- 2 files changed, 111 insertions(+), 15 deletions(-) --
[GitHub] incubator-beam pull request #432: [BEAM-196] Additional fix to ensure the Pi...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/432 [BEAM-196] Additional fix to ensure the PipelineOptions are available in DoFns. Fixes a NullPointException if `PipelineOptions` are acceessed inside a DoFn and backs it up with a test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-196 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/432.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #432 commit 05287c4ddcc38378e4e8cd921b6f6394e43eed75 Author: Maximilian Michels <m...@apache.org> Date: 2016-06-07T11:57:33Z [BEAM-196] fix NPE in ParDoWrapper commit 595146d5759a6f631f057050cdbe71f9849035f9 Author: Maximilian Michels <m...@apache.org> Date: 2016-06-08T10:40:50Z [flink] improve lifecycle of ParDoBoundWrapper --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #398
This closes #398 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ffecfda Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ffecfda Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ffecfda Branch: refs/heads/master Commit: 2ffecfda2313a609f6db70b942d6d9a8984f464a Parents: 1cd64bb 9706438 Author: Maximilian MichelsAuthored: Tue May 31 11:13:50 2016 +0200 Committer: Maximilian Michels Committed: Tue May 31 11:13:50 2016 +0200 -- .../translation/FlinkBatchPipelineTranslator.java | 16 +--- 1 file changed, 1 insertion(+), 15 deletions(-) --
[2/4] incubator-beam git commit: [BEAM-235] use streaming mode on unbounded sources
[BEAM-235] use streaming mode on unbounded sources This change automatically discovers the execution mode of the Pipeline during a preliminary "optimization" translation of the pipeline. When unbounded sources are discovered, the pipeline translation mode is switched to streaming. Users may still supply the streaming flag to override this behavior. Users who forget to supply the flag, will automatically use streaming mode whenever they use unbounded sources. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5632bbf8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5632bbf8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5632bbf8 Branch: refs/heads/master Commit: 5632bbf8a85a7d53fcaa53535030c4b406d8a09a Parents: cca2577 Author: Maximilian MichelsAuthored: Mon May 30 09:59:12 2016 +0200 Committer: Maximilian Michels Committed: Mon May 30 12:11:24 2016 +0200 -- runners/flink/README.md | 9 +- .../FlinkPipelineExecutionEnvironment.java | 149 +++ .../beam/runners/flink/FlinkPipelineRunner.java | 13 +- .../FlinkBatchPipelineTranslator.java | 8 - .../translation/FlinkPipelineTranslator.java| 17 +++ .../FlinkStreamingPipelineTranslator.java | 10 -- .../PipelineTranslationOptimizer.java | 73 + .../flink/translation/TranslationMode.java | 31 .../main/java/org/apache/beam/sdk/Pipeline.java | 2 +- 9 files changed, 186 insertions(+), 126 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/runners/flink/README.md -- diff --git a/runners/flink/README.md b/runners/flink/README.md index 7418f16..457e2a6 100644 --- a/runners/flink/README.md +++ b/runners/flink/README.md @@ -27,11 +27,14 @@ and sinks or use the provided support for Apache Kafka. ### Seamless integration -To execute a Beam program in streaming mode, just enable streaming in the `PipelineOptions`: +The Flink Runner decides to use batch or streaming execution mode based on whether programs use +unbounded sources. When unbounded sources are used, it executes in streaming mode, otherwise it +uses the batch execution mode. -options.setStreaming(true); +If you wish to explicitly enable streaming mode, please set the streaming flag in the +`PipelineOptions`: -That's it. If you prefer batched execution, simply disable streaming mode. +options.setStreaming(true); ## Batch http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index d31d790..4cd8fb3 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -20,6 +20,8 @@ package org.apache.beam.runners.flink; import org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator; import org.apache.beam.runners.flink.translation.FlinkPipelineTranslator; import org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator; +import org.apache.beam.runners.flink.translation.PipelineTranslationOptimizer; +import org.apache.beam.runners.flink.translation.TranslationMode; import org.apache.beam.sdk.Pipeline; import com.google.common.base.Preconditions; @@ -39,7 +41,7 @@ import java.util.List; * Depending on if the job is a Streaming or Batch processing one, it creates * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}), * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or - * {@link FlinkStreamingPipelineTranslator})to transform the Beam job into a Flink one, and + * {@link FlinkStreamingPipelineTranslator}) to transform the Beam job into a Flink one, and * executes the (translated) job. */ public class FlinkPipelineExecutionEnvironment { @@ -57,7 +59,6 @@ public class FlinkPipelineExecutionEnvironment { */ private ExecutionEnvironment flinkBatchEnv; - /** * The Flink Streaming execution environment. This is instantiated to either a * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or @@ -67,51 +68,13 @@ public class
[1/4] incubator-beam git commit: [Beam-312] don't checkpoint if CheckpointCoder not available
Repository: incubator-beam Updated Branches: refs/heads/master cca2577c6 -> 36a27f538 [Beam-312] don't checkpoint if CheckpointCoder not available This skips the checkpoint logic in the UnboundedSourceWrapper if the UnboundedSource doesn't supply a CheckpointMarkCoder. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c4072ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c4072ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c4072ad Branch: refs/heads/master Commit: 9c4072ad87f25248f77e437e5bcf674aff19982b Parents: cca2577 Author: Maximilian MichelsAuthored: Mon May 30 15:59:12 2016 +0200 Committer: Maximilian Michels Committed: Sat May 28 16:17:15 2016 +0200 -- .../streaming/io/UnboundedSourceWrapper.java| 24 +--- 1 file changed, 21 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c4072ad/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index b816e2a..7f26a65 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -120,10 +120,17 @@ public class UnboundedSourceWrapper< } Coder checkpointMarkCoder = source.getCheckpointMarkCoder(); -Coder> sourceCoder = -SerializableCoder.of(new TypeDescriptor >() {}); +if (checkpointMarkCoder == null) { + LOG.info("No CheckpointMarkCoder specified for this source. Won't create snapshots."); + checkpointCoder = null; +} else { + + Coder> sourceCoder = + SerializableCoder.of(new TypeDescriptor >() { + }); -checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, checkpointMarkCoder)); + checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, checkpointMarkCoder)); +} // get the splits early. we assume that the generated splits are stable, // this is necessary so that the mapping of state to source is correct @@ -308,6 +315,12 @@ public class UnboundedSourceWrapper< @Override public byte[] snapshotState(long l, long l1) throws Exception { + +if (checkpointCoder == null) { + // no checkpoint coder available in this source + return null; +} + // we checkpoint the sources along with the CheckpointMarkT to ensure // than we have a correct mapping of checkpoints to sources when // restoring @@ -333,6 +346,11 @@ public class UnboundedSourceWrapper< @Override public void restoreState(byte[] bytes) throws Exception { +if (checkpointCoder == null) { + // no checkpoint coder available in this source + return; +} + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) { restoredState = checkpointCoder.decode(bais, Coder.Context.OUTER); }
[3/4] incubator-beam git commit: This closes #395
This closes #395 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5d2f9cd2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5d2f9cd2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5d2f9cd2 Branch: refs/heads/master Commit: 5d2f9cd29b1ea70b3e6bbd5c884260c3695262d3 Parents: cca2577 9c4072a Author: Maximilian MichelsAuthored: Mon May 30 12:18:00 2016 +0200 Committer: Maximilian Michels Committed: Mon May 30 12:18:00 2016 +0200 -- .../streaming/io/UnboundedSourceWrapper.java| 24 +--- 1 file changed, 21 insertions(+), 3 deletions(-) --
[4/4] incubator-beam git commit: This closes #394
This closes #394 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/36a27f53 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/36a27f53 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/36a27f53 Branch: refs/heads/master Commit: 36a27f53836475048db884aeaa70d496001f1f41 Parents: 5d2f9cd 5632bbf Author: Maximilian MichelsAuthored: Mon May 30 12:18:18 2016 +0200 Committer: Maximilian Michels Committed: Mon May 30 12:18:18 2016 +0200 -- runners/flink/README.md | 9 +- .../FlinkPipelineExecutionEnvironment.java | 149 +++ .../beam/runners/flink/FlinkPipelineRunner.java | 13 +- .../FlinkBatchPipelineTranslator.java | 8 - .../translation/FlinkPipelineTranslator.java| 17 +++ .../FlinkStreamingPipelineTranslator.java | 10 -- .../PipelineTranslationOptimizer.java | 73 + .../flink/translation/TranslationMode.java | 31 .../main/java/org/apache/beam/sdk/Pipeline.java | 2 +- 9 files changed, 186 insertions(+), 126 deletions(-) --
[GitHub] incubator-beam pull request: [Beam-312] don't checkpoint if Checkp...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/395 [Beam-312] don't checkpoint if CheckpointCoder not available This skips the checkpoint logic in the UnboundedSourceWrapper if the UnboundedSource doesn't supply a CheckpointMarkCoder. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-312 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/395.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #395 commit 9c4072ad87f25248f77e437e5bcf674aff19982b Author: Maximilian Michels <m...@apache.org> Date: 2016-05-30T13:59:12Z [Beam-312] don't checkpoint if CheckpointCoder not available This skips the checkpoint logic in the UnboundedSourceWrapper if the UnboundedSource doesn't supply a CheckpointMarkCoder. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request: [BEAM-235] use streaming mode on unbo...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/394 [BEAM-235] use streaming mode on unbounded sources This change automatically discovers the execution mode of the Pipeline during a preliminary "optimization" translation of the pipeline. When unbounded sources are discovered, the pipeline translation mode is switched to streaming. Users may still supply the streaming flag to override this behavior. Users who forget to supply the flag, will automatically use streaming mode whenever they use unbounded sources. - TODO: update documentation You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-235 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/394.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #394 commit c9c1e732f06dd31b565fbf632634a78d38fa3add Author: Maximilian Michels <m...@apache.org> Date: 2016-05-30T07:59:12Z [BEAM-235] use streaming mode on unbounded sources This change automatically discovers the execution mode of the Pipeline during a preliminary "optimization" translation of the pipeline. When unbounded sources are discovered, the pipeline translation mode is switched to streaming. Users may still supply the streaming flag to override this behavior. Users who forget to supply the flag, will automatically use streaming mode whenever they use unbounded sources. - TODO: update documentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/3] incubator-beam-site git commit: [BEAM-103] rebuild capability matrix
[BEAM-103] rebuild capability matrix Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/15dd578f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/15dd578f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/15dd578f Branch: refs/heads/asf-site Commit: 15dd578f15885b51b3fbef1c9deab302814b8f07 Parents: e9937d9 Author: Maximilian MichelsAuthored: Fri May 27 18:39:31 2016 +0200 Committer: Maximilian Michels Committed: Fri May 27 18:39:49 2016 +0200 -- content/capability-matrix/index.html | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/15dd578f/content/capability-matrix/index.html -- diff --git a/content/capability-matrix/index.html b/content/capability-matrix/index.html index 767df88..9981074 100644 --- a/content/capability-matrix/index.html +++ b/content/capability-matrix/index.html @@ -95,7 +95,7 @@ Apache Beam Capability Matrix -Last updated: 2016-05-18 20:55 PDT +Last updated: 2016-05-27 18:38 CEST Apache Beam (incubating) provides a portable API layer for building sophisticated data-parallel processing engines that may be executed across a diversity of exeuction engines, or runners. The core concepts of this layer are based upon the Beam Model (formerly referred to as the http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf;>Dataflow Model), and implemented to varying degrees in each Beam runner. To help clarify the capabilities of individual runners, we’ve created the capability matrix below. @@ -313,7 +313,7 @@ -~ (https://issues.apache.org/jira/browse/BEAM-103;>BEAM-103) + @@ -1082,7 +1082,7 @@ -Partially: parallelism 1 in streaming(https://issues.apache.org/jira/browse/BEAM-103;>BEAM-103)Fully supported in batch. In streaming, sources currently run with parallelism 1. +Yes: fully supported
[3/3] incubator-beam-site git commit: This closes #19
This closes #19 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/65770137 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/65770137 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/65770137 Branch: refs/heads/asf-site Commit: 657701378fc1935a67e5b5a5fe1427997b6c9029 Parents: 92fc275 15dd578 Author: Maximilian MichelsAuthored: Fri May 27 18:41:45 2016 +0200 Committer: Maximilian Michels Committed: Fri May 27 18:41:45 2016 +0200 -- _data/capability-matrix.yml | 7 +++ content/capability-matrix/index.html | 6 +++--- 2 files changed, 6 insertions(+), 7 deletions(-) --
[1/3] incubator-beam-site git commit: [BEAM-103] update capability matrix
Repository: incubator-beam-site Updated Branches: refs/heads/asf-site 92fc27503 -> 657701378 [BEAM-103] update capability matrix This reflects the changes of BEAM-103 in the Capability Matrix. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/e9937d9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/e9937d9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/e9937d9c Branch: refs/heads/asf-site Commit: e9937d9ce610c245ebc00c3b02e9f4dd3cb9e32d Parents: 92fc275 Author: Maximilian MichelsAuthored: Tue May 24 15:41:21 2016 +0200 Committer: Maximilian Michels Committed: Fri May 27 18:38:16 2016 +0200 -- _data/capability-matrix.yml | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/e9937d9c/_data/capability-matrix.yml -- diff --git a/_data/capability-matrix.yml b/_data/capability-matrix.yml index f34527d..e2f66b9 100644 --- a/_data/capability-matrix.yml +++ b/_data/capability-matrix.yml @@ -140,10 +140,9 @@ categories: l2: fully supported l3: - class: flink -jira: BEAM-103 -l1: 'Partially' -l2: parallelism 1 in streaming -l3: Fully supported in batch. In streaming, sources currently run with parallelism 1. +l1: 'Yes' +l2: fully supported +l3: - class: spark l1: 'Yes' l2: fully supported
[2/2] incubator-beam git commit: This closes #344
This closes #344 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc64d654 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc64d654 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc64d654 Branch: refs/heads/master Commit: cc64d654c5027c197eb1c1d6f64461edf1dee989 Parents: d627266 9f63000 Author: Maximilian MichelsAuthored: Tue May 17 19:19:12 2016 +0200 Committer: Maximilian Michels Committed: Tue May 17 19:19:12 2016 +0200 -- .../flink/translation/wrappers/SinkOutputFormat.java | 15 +-- 1 file changed, 1 insertion(+), 14 deletions(-) --
[1/2] incubator-beam git commit: [flink] replace obsolete reflection call
Repository: incubator-beam Updated Branches: refs/heads/master d627266d8 -> cc64d654c [flink] replace obsolete reflection call Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f630002 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f630002 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f630002 Branch: refs/heads/master Commit: 9f630002e235f02042c309e57ea44a163ede8bdf Parents: d627266 Author: Maximilian MichelsAuthored: Tue May 17 19:12:02 2016 +0200 Committer: Maximilian Michels Committed: Tue May 17 19:12:02 2016 +0200 -- .../flink/translation/wrappers/SinkOutputFormat.java | 15 +-- 1 file changed, 1 insertion(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f630002/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java index 2766a87..53e544d 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java @@ -46,23 +46,10 @@ public class SinkOutputFormat implements OutputFormat { private AbstractID uid = new AbstractID(); public SinkOutputFormat(Write.Bound transform, PipelineOptions pipelineOptions) { -this.sink = extractSink(transform); +this.sink = transform.getSink(); this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); } - private Sink extractSink(Write.Bound transform) { -// TODO possibly add a getter in the upstream -try { - Field sinkField = transform.getClass().getDeclaredField("sink"); - sinkField.setAccessible(true); - @SuppressWarnings("unchecked") - Sink extractedSink = (Sink) sinkField.get(transform); - return extractedSink; -} catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException("Could not acquire custom sink field.", e); -} - } - @Override public void configure(Configuration configuration) { writeOperation = sink.createWriteOperation(serializedOptions.getPipelineOptions());
[GitHub] incubator-beam pull request: [flink] replace obsolete reflection c...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/344 [flink] replace obsolete reflection call Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [X] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [X] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [X] Replace `` in the title with the actual Jira issue number, if there is one. - [X] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam reflectionCall Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/344.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #344 commit 9f630002e235f02042c309e57ea44a163ede8bdf Author: Maximilian Michels <m...@apache.org> Date: 2016-05-17T17:12:02Z [flink] replace obsolete reflection call --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #324
This closes #324 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/123674f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/123674f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/123674f4 Branch: refs/heads/master Commit: 123674f4b5a823cc593514a131943fbcc462ab7a Parents: 6ec9e96 50edd23 Author: Maximilian MichelsAuthored: Thu May 12 10:57:33 2016 +0200 Committer: Maximilian Michels Committed: Thu May 12 10:57:33 2016 +0200 -- runners/flink/runner/pom.xml| 10 --- .../runners/flink/FlinkPipelineOptions.java | 30 ++-- .../beam/runners/flink/FlinkPipelineRunner.java | 4 +-- 3 files changed, 28 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/123674f4/runners/flink/runner/pom.xml --
[GitHub] incubator-beam pull request: [BEAM-272][flink] remove dependency o...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/324 [BEAM-272][flink] remove dependency on Dataflow Runner Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [X] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [X] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [X] Replace `` in the title with the actual Jira issue number, if there is one. - [X] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-272 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/324.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #324 commit abfe73a962624001022a83ce4dd5c1697037bdbf Author: Maximilian Michels <m...@apache.org> Date: 2016-05-11T09:57:44Z [BEAM-272][flink] remove dependency on Dataflow Runner --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/4] incubator-beam git commit: fix Flink source coder handling
fix Flink source coder handling Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aead96ff Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aead96ff Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aead96ff Branch: refs/heads/master Commit: aead96ff4c018b96a7b5ab1defb408c2a09b1be7 Parents: bc847a9 Author: Maximilian MichelsAuthored: Thu Apr 28 12:00:18 2016 +0200 Committer: Maximilian Michels Committed: Fri Apr 29 17:58:00 2016 +0200 -- .../FlinkStreamingTransformTranslators.java | 13 +++- .../flink/translation/types/FlinkCoder.java | 64 .../streaming/io/UnboundedFlinkSource.java | 12 +++- 3 files changed, 84 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aead96ff/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index db24f9d..618727d 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.flink.translation; import org.apache.beam.runners.flink.translation.functions.UnionCoder; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.FlinkCoder; import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat; import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper; import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupByKeyWrapper; @@ -262,9 +263,15 @@ public class FlinkStreamingTransformTranslators { DataStream source; if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { @SuppressWarnings("unchecked") -UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource(); -source = context.getExecutionEnvironment() -.addSource(flinkSource.getFlinkSource()) +UnboundedFlinkSource flinkSourceFunction = (UnboundedFlinkSource) transform.getSource(); +DataStream flinkSource = context.getExecutionEnvironment() +.addSource(flinkSourceFunction.getFlinkSource()); + +flinkSourceFunction.setCoder( +new FlinkCoder(flinkSource.getType(), + context.getExecutionEnvironment().getConfig())); + +source = flinkSource .flatMap(new FlatMapFunction () { @Override public void flatMap(T s, Collector collector) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aead96ff/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java new file mode 100644 index 000..3b1e66e --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.types; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StandardCoder; +import
[3/4] incubator-beam git commit: Flink sink implementation
Flink sink implementation Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc847a95 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc847a95 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc847a95 Branch: refs/heads/master Commit: bc847a9582447372461c5cf35450ba4a4c3d490d Parents: 4fd9d74 Author: Maximilian MichelsAuthored: Fri Apr 22 12:33:26 2016 +0200 Committer: Maximilian Michels Committed: Fri Apr 29 17:58:00 2016 +0200 -- .../FlinkStreamingTransformTranslators.java | 33 +++- .../streaming/io/UnboundedFlinkSink.java| 175 +++ 2 files changed, 204 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc847a95/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 927c3a2..db24f9d 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -26,13 +26,16 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupBy import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundMultiWrapper; import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundWrapper; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.FlinkStreamingCreateFunction; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -64,12 +67,8 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; -import org.apache.flink.streaming.api.functions.TimestampAssigner; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.util.Collector; -import org.apache.kafka.common.utils.Time; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +103,9 @@ public class FlinkStreamingTransformTranslators { TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator()); TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); + +TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator()); + TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator()); TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); @@ -193,6 +195,29 @@ public class FlinkStreamingTransformTranslators { } } + private static class WriteSinkStreamingTranslator implements FlinkStreamingPipelineTranslator.StreamTransformTranslator { + +@Override +public void translateNode(Write.Bound transform, FlinkStreamingTranslationContext context) { + String name = transform.getName(); + PValue input = context.getInput(transform); + + Sink sink = transform.getSink(); + if (!(sink instanceof UnboundedFlinkSink)) { +throw new UnsupportedOperationException("At the time, only unbounded Flink sinks are supported."); + } + + DataStream inputDataSet = context.getInputDataStream(input); + + inputDataSet.flatMap(new FlatMapFunction () { +
[4/4] incubator-beam git commit: This closes #266
This closes #266 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/661a4a89 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/661a4a89 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/661a4a89 Branch: refs/heads/master Commit: 661a4a893c5afa2f257969bd25d4c01c42693fac Parents: 4fd9d74 63bce07 Author: Maximilian MichelsAuthored: Fri Apr 29 17:58:11 2016 +0200 Committer: Maximilian Michels Committed: Fri Apr 29 17:58:11 2016 +0200 -- .../examples/streaming/KafkaIOExamples.java | 337 +++ .../FlinkStreamingTransformTranslators.java | 46 ++- .../flink/translation/types/FlinkCoder.java | 64 .../streaming/io/UnboundedFlinkSink.java| 175 ++ .../streaming/io/UnboundedFlinkSource.java | 12 +- 5 files changed, 625 insertions(+), 9 deletions(-) --
[1/4] incubator-beam git commit: add Kafka IO examples
Repository: incubator-beam Updated Branches: refs/heads/master 4fd9d74df -> 661a4a893 add Kafka IO examples Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63bce07d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63bce07d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63bce07d Branch: refs/heads/master Commit: 63bce07d8c6cc5e610ad24e915e2585fef582567 Parents: aead96f Author: Maximilian MichelsAuthored: Thu Apr 28 12:02:05 2016 +0200 Committer: Maximilian Michels Committed: Fri Apr 29 17:58:00 2016 +0200 -- .../examples/streaming/KafkaIOExamples.java | 337 +++ 1 file changed, 337 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/63bce07d/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java -- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java new file mode 100644 index 000..af6bb35 --- /dev/null +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.examples.streaming; + +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.Properties; + +/** + * Recipes/Examples that demonstrate how to read/write data from/to Kafka. + */ +public class KafkaIOExamples { + + + private static final String KAFKA_TOPIC = "input"; // Default kafka topic to read from + private static final String KAFKA_AVRO_TOPIC = "output"; // Default kafka topic to read from + private static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact + private static final String GROUP_ID = "myGroup"; // Default groupId + private static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka + + /** + * Read/Write String data to Kafka + */ + public static class KafkaString { + +/** + * Read String data from Kafka + */ +public static class ReadStringFromKafka { + + public static void main(String[] args) { + +Pipeline p = initializePipeline(args); +KafkaOptions options = getOptions(p); + +FlinkKafkaConsumer08 kafkaConsumer = +new
[GitHub] incubator-beam pull request: Add option to use Flink's Kafka Write...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/266 Add option to use Flink's Kafka Write IO This pull request adds the counterpart of the UnboundedFlinkSource, the `UnboundedFlinkSink` which uses the `Write` API. Users have requested this multiple times, e.g. to use the Flink Kafka Producer in a Beam program. In the long run we will opt only for Beam IO interfaces. I would like to replace the custom Flink sources and sinks as soon as we have the relevant connectors for users in place. In the meantime, users can explore the potential of Beam using also native backend connectors. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam kafkaSink-pr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/266.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #266 commit 3780b01f9ff0a2ffb645b961e127c50ae97affd8 Author: Maximilian Michels <m...@apache.org> Date: 2016-04-22T10:33:26Z Kafka sink implementation commit 1db316971b6ecd0a27cefb0408266c914c1f7d89 Author: Maximilian Michels <m...@apache.org> Date: 2016-04-28T10:00:18Z fix Flink source coder handling commit fff968b03177ba53f3bdad2055f67dc5633d5628 Author: Maximilian Michels <m...@apache.org> Date: 2016-04-28T10:02:05Z add Kafka IO examples --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: [BEAM-207] Flink test flake in ReadSourceStreamingITCase
Repository: incubator-beam Updated Branches: refs/heads/master b8951c231 -> d5b1d5135 [BEAM-207] Flink test flake in ReadSourceStreamingITCase Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd8bc93e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd8bc93e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd8bc93e Branch: refs/heads/master Commit: dd8bc93ec3104c481a9ea646406194c1116dae71 Parents: 70e6a13 Author: Maximilian MichelsAuthored: Tue Apr 19 09:20:30 2016 +0200 Committer: Maximilian Michels Committed: Wed Apr 20 19:17:50 2016 +0200 -- .../flink/translation/wrappers/SourceInputFormat.java | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd8bc93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index dc11c77..debd1a1 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -46,8 +46,8 @@ public class SourceInputFormat implements InputFormat private transient PipelineOptions options; private final SerializedPipelineOptions serializedOptions; - private transient BoundedSource.BoundedReader reader = null; - private boolean inputAvailable = true; + private transient BoundedSource.BoundedReader reader; + private boolean inputAvailable = false; public SourceInputFormat(BoundedSource initialSource, PipelineOptions options) { this.initialSource = initialSource; @@ -135,6 +135,9 @@ public class SourceInputFormat implements InputFormat @Override public void close() throws IOException { -reader.close(); +// TODO null check can be removed once FLINK-3796 is fixed +if (reader != null) { + reader.close(); +} } }
[GitHub] incubator-beam pull request: [BEAM-207] Flink test flake in ReadSo...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/209 [BEAM-207] Flink test flake in ReadSourceStreamingITCase The `configure(..)` life cycle method is only called on the master but not on the worker nodes. This may lead to an incorrect initialization of the `Reader` because the `PipelineOptions` haven't been initialized. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-207 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/209.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #209 commit a95e67c482d3378cf472e75a47275cedbf70de41 Author: Maximilian Michels <m...@apache.org> Date: 2016-04-19T07:20:30Z [BEAM-207] Flink test flake in ReadSourceStreamingITCase The configure(..) life cycle method is only called on the master but not on the worker nodes. This may lead to an incorrect initialization of the Reader because the PipelineOptions haven't been initialized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/3] incubator-beam git commit: [BEAM-196] abstraction for PipelineOptions serialization
[BEAM-196] abstraction for PipelineOptions serialization Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81577b31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81577b31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81577b31 Branch: refs/heads/master Commit: 81577b31c2642522f7dd4ba8eba794df48a0ca56 Parents: 56e28a9 Author: Maximilian MichelsAuthored: Mon Apr 18 17:40:38 2016 +0200 Committer: Maximilian Michels Committed: Mon Apr 18 18:10:05 2016 +0200 -- .../utils/SerializedPipelineOptions.java| 54 .../beam/runners/flink/PipelineOptionsTest.java | 68 2 files changed, 122 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81577b31/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java new file mode 100644 index 000..7439e02 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink.translation.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.beam.sdk.options.PipelineOptions; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; + +/** + * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. + */ +public class SerializedPipelineOptions implements Serializable { + + private byte[] serializedOptions; + + public SerializedPipelineOptions(PipelineOptions options) { + +try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + new ObjectMapper().writeValue(baos, options); + this.serializedOptions = baos.toByteArray(); +} catch (Exception e) { + throw new RuntimeException("Couldn't serialize PipelineOptions.", e); +} + + } + + public PipelineOptions deserializeOptions() { +try { + return new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); +} catch (IOException e) { + throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); +} + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81577b31/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java -- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java new file mode 100644 index 000..464c6df --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License.
[3/3] incubator-beam git commit: This closes #200
This closes #200 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/70e6a131 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/70e6a131 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/70e6a131 Branch: refs/heads/master Commit: 70e6a1310fd3871fdd10287d6a50e24edb7f6f18 Parents: bf78e96 43b5ec7 Author: Maximilian MichelsAuthored: Tue Apr 19 09:44:51 2016 +0200 Committer: Maximilian Michels Committed: Tue Apr 19 09:44:51 2016 +0200 -- .../functions/FlinkDoFnFunction.java| 25 ++- .../functions/FlinkMultiOutputDoFnFunction.java | 27 ++-- .../utils/SerializedPipelineOptions.java| 63 ++ .../translation/wrappers/SinkOutputFormat.java | 28 ++-- .../translation/wrappers/SourceInputFormat.java | 24 ++- .../FlinkGroupAlsoByWindowWrapper.java | 53 ++- .../streaming/io/UnboundedSourceWrapper.java| 28 ++-- .../beam/runners/flink/PipelineOptionsTest.java | 68 8 files changed, 157 insertions(+), 159 deletions(-) --
[GitHub] incubator-beam pull request: [BEAM-196] Pipeline options must be a...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/200 [BEAM-196] Pipeline options must be available Context in DoFn.startBundle This gets rid of the custom Java serialization code by defaulting to serialization of the `PipelineOptions` to a byte array. So far, this has been proven the most hassle-free method for the Flink Runner. For code reuse and avoiding multiple deserialization of the byte array, the `SerializedPipelineOptions` class has been introduced. The changes also make the options accessible in the context of the `DoFn` function. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-196 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/200.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #200 commit 81577b31c2642522f7dd4ba8eba794df48a0ca56 Author: Maximilian Michels <m...@apache.org> Date: 2016-04-18T15:40:38Z [BEAM-196] abstraction for PipelineOptions serialization commit 43b5ec743718e63c2d9d9532e3ca55bc87370290 Author: Maximilian Michels <m...@apache.org> Date: 2016-04-18T15:40:50Z [BEAM-196] make use of SerializedPipelineOptions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[3/4] incubator-beam git commit: [flink] improve InputFormat wrapper and ReadSourceITCase
[flink] improve InputFormat wrapper and ReadSourceITCase Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6eac35e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6eac35e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6eac35e8 Branch: refs/heads/master Commit: 6eac35e81e93c25da4668fc1b0d30f7c942383f0 Parents: 7646384 Author: Maximilian MichelsAuthored: Wed Mar 30 16:43:04 2016 +0200 Committer: Maximilian Michels Committed: Mon Apr 18 16:36:43 2016 +0200 -- .../translation/wrappers/SourceInputFormat.java | 83 +++ .../beam/runners/flink/ReadSourceITCase.java| 100 ++- 2 files changed, 43 insertions(+), 140 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6eac35e8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index 26e6297..4b11abc 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -23,20 +23,20 @@ import org.apache.beam.sdk.options.PipelineOptions; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.List; + /** * A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a * Dataflow {@link org.apache.beam.sdk.io.Source}. @@ -45,37 +45,40 @@ public class SourceInputFormat implements InputFormat private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); private final BoundedSource initialSource; + private transient PipelineOptions options; + private final byte[] serializedOptions; - private BoundedSource.BoundedReader reader = null; - private boolean reachedEnd = true; + private transient BoundedSource.BoundedReader reader = null; + private boolean inputAvailable = true; public SourceInputFormat(BoundedSource initialSource, PipelineOptions options) { this.initialSource = initialSource; this.options = options; - } - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { -out.defaultWriteObject(); -ObjectMapper mapper = new ObjectMapper(); -mapper.writeValue(out, options); - } +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +try { + new ObjectMapper().writeValue(baos, options); + serializedOptions = baos.toByteArray(); +} catch (Exception e) { + throw new RuntimeException("Couldn't serialize PipelineOptions.", e); +} - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { -in.defaultReadObject(); -ObjectMapper mapper = new ObjectMapper(); -options = mapper.readValue(in, PipelineOptions.class); } @Override - public void configure(Configuration configuration) {} + public void configure(Configuration configuration) { +try { + options = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class); +} catch (IOException e) { + throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); +} + } @Override public void open(SourceInputSplit sourceInputSplit) throws IOException { reader = ((BoundedSource) sourceInputSplit.getSource()).createReader(options); -reachedEnd = false; +inputAvailable = reader.start(); } @Override @@ -87,7 +90,6 @@ public class SourceInputFormat implements InputFormat @Override public long getTotalInputSize() { return estimatedSize; - } @Override @@ -110,17 +112,15 @@ public class SourceInputFormat implements InputFormat @Override @SuppressWarnings("unchecked")
[2/4] incubator-beam git commit: [flink] improvements to UnboundedSource translation
[flink] improvements to UnboundedSource translation Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c4f2dc1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c4f2dc1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c4f2dc1 Branch: refs/heads/master Commit: 7c4f2dc1e74d7d985fef80cc3cbccb6e390d16aa Parents: 6eac35e Author: Maximilian MichelsAuthored: Wed Mar 30 19:05:27 2016 +0200 Committer: Maximilian Michels Committed: Mon Apr 18 16:36:43 2016 +0200 -- .../FlinkStreamingTransformTranslators.java| 17 - 1 file changed, 12 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c4f2dc1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 541cd40..a1e9f47 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -194,19 +194,26 @@ public class FlinkStreamingTransformTranslators { DataStream source; if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { -UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource(); +@SuppressWarnings("unchecked") +UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource(); source = context.getExecutionEnvironment() .addSource(flinkSource.getFlinkSource()) -.flatMap(new FlatMapFunction () { +.flatMap(new FlatMapFunction () { @Override - public void flatMap(String s, Collector collector) throws Exception { -collector.collect(WindowedValue.of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + public void flatMap(T s, Collector collector) throws Exception { +collector.collect( +WindowedValue.of( +s, +Instant.now(), +GlobalWindow.INSTANCE, +PaneInfo.NO_FIRING)); } -}).assignTimestampsAndWatermarks(new IngestionTimeExtractor()); +}).assignTimestampsAndWatermarks(new IngestionTimeExtractor ()); } else { source = context.getExecutionEnvironment() .addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform)); } + context.setOutputDataStream(output, source); } }
[GitHub] incubator-beam pull request: [BEAM-158] add support for bounded so...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/104 [BEAM-158] add support for bounded sources in streaming Apart from a few improvements, this PR introduces bounded sources in streaming. The BoundedSource wrapper (`SourceInputFormat`) is the same as for the batch part of the runner. The translator assigns ingestion time watermarks and processing time timestamps upon reading from the source. We could make this more flexible in terms of watermark generation if we had an UnboundedSource wrapper for a BoundedSource. Perhaps we could have common utility for runners to deal with serialization of PipelineOptions. At some point, they have to be shipped. I had to change the serialization code because I was experiencing a serialization bug which led to a serialization loop. Debugging this was almost impossible because the stack trace doesn't show all serialization calls due to some magic in the VM. I didn't find any cyclic references between the PipelineOptions and Flink components. I'm assuming this is a bug and the workaround using byte array serialization of the options is fair enough. See `SourceInputFormat`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-158 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/104.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #104 commit f26674e7a7b30ee3f992edfc8e473df2a7ee3e80 Author: Maximilian Michels <m...@apache.org> Date: 2016-03-30T14:43:04Z [flink] improve InputFormat wrapper and ReadSourceITCase commit 03404c7f4a5656bb5c5c0a2510f12e33292fef01 Author: Maximilian Michels <m...@apache.org> Date: 2016-03-30T17:05:27Z [flink] improvements to UnboundedSource translation commit de574136a5b4ba9b75231b321d0190e23af3bac2 Author: Maximilian Michels <m...@apache.org> Date: 2016-03-31T08:18:01Z [BEAM-158] add support for bounded sources in streaming --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[3/3] incubator-beam git commit: This closes #94.
This closes #94. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96e286fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96e286fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96e286fe Branch: refs/heads/master Commit: 96e286fec758bb451ff383e6e7c3f2b5bb0cb840 Parents: 0c47cad 63a7c3d Author: Maximilian MichelsAuthored: Thu Mar 31 11:11:09 2016 +0200 Committer: Maximilian Michels Committed: Thu Mar 31 11:11:09 2016 +0200 -- .../FlinkGroupAlsoByWindowWrapper.java | 22 +--- 1 file changed, 14 insertions(+), 8 deletions(-) --
[1/3] incubator-beam git commit: [flink] improve lifecycle handling of GroupAlsoByWindowWrapper
Repository: incubator-beam Updated Branches: refs/heads/master 0c47cad48 -> 96e286fec [flink] improve lifecycle handling of GroupAlsoByWindowWrapper Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/033b9240 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/033b9240 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/033b9240 Branch: refs/heads/master Commit: 033b9240765543438068c1adea6d0cff34ddcd53 Parents: 17863c8 Author: Maximilian MichelsAuthored: Mon Mar 28 11:31:38 2016 +0200 Committer: Maximilian Michels Committed: Wed Mar 30 11:31:56 2016 +0200 -- .../wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/033b9240/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index b413d7a..751d44c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -220,6 +220,7 @@ public class FlinkGroupAlsoByWindowWrapper public void open() throws Exception { super.open(); this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals); +operator.startBundle(context); } /** @@ -252,11 +253,7 @@ public class FlinkGroupAlsoByWindowWrapper private void processKeyedWorkItem(KeyedWorkItem workItem) throws Exception { context.setElement(workItem, getStateInternalsForKey(workItem.key())); - -// TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded. -operator.startBundle(context); operator.processElement(context); -operator.finishBundle(context); } @Override @@ -309,6 +306,7 @@ public class FlinkGroupAlsoByWindowWrapper @Override public void close() throws Exception { +operator.finishBundle(context); super.close(); }
[GitHub] incubator-beam pull request: [flink] improve lifecycle handling of...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/94 [flink] improve lifecycle handling of GroupAlsoByWindowWrapper Could someone have a look if these minor changes are good to merge? @davorbonaci @kennknowles Note, that there are two commits. One for the lifecycle of the Operator, another one for improving code readability. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam lifecycle Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/94.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #94 commit 033b9240765543438068c1adea6d0cff34ddcd53 Author: Maximilian Michels <m...@apache.org> Date: 2016-03-28T09:31:38Z [flink] improve lifecycle handling of GroupAlsoByWindowWrapper commit 63a7c3d0cb51caf65dc82141671cf28d47c2be39 Author: Maximilian Michels <m...@apache.org> Date: 2016-03-30T10:02:01Z [flink] improve readability of processElement function --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request: [BEAM-149] move language source confi...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/73 [BEAM-149] move language source config to pluginManagement This fixes issues with the source/target level in IntelliJ. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-149 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/73.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #73 commit d869d7d371654b694bde498ae6b2b4a1d320850c Author: Maximilian Michels <m...@apache.org> Date: 2016-03-24T17:54:05Z [BEAM-149] move language source config to pluginManagement This fixes issues with the source/target level in IntelliJ. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[05/10] incubator-beam git commit: Update AutoComplete.java
Update AutoComplete.java Allow for the Datastore dependency of this test to be in a different project than the main project for the job. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/834d0710 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/834d0710 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/834d0710 Branch: refs/heads/master Commit: 834d071060cb916bedc0859baa256791ab22b7d4 Parents: 158f9f8 Author: sammcveetyAuthored: Tue Mar 22 11:58:19 2016 -0700 Committer: Maximilian Michels Committed: Wed Mar 23 19:27:51 2016 +0100 -- .../cloud/dataflow/examples/complete/AutoComplete.java| 10 -- 1 file changed, 8 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834d0710/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java -- diff --git a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java index 1bccc4a..f897338 100644 --- a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java +++ b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java @@ -57,6 +57,7 @@ import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PBegin; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionList; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import org.joda.time.Duration; @@ -432,10 +433,14 @@ public class AutoComplete { Boolean getOutputToBigQuery(); void setOutputToBigQuery(Boolean value); -@Description("Whether output to Datastoree") +@Description("Whether output to Datastore") @Default.Boolean(false) Boolean getOutputToDatastore(); void setOutputToDatastore(Boolean value); + +@Description("Datastore output dataset ID, defaults to project ID") +String getOutputDataset(); +void setOutputDataset(String value); } public static void main(String[] args) throws IOException { @@ -477,7 +482,8 @@ public class AutoComplete { if (options.getOutputToDatastore()) { toWrite .apply(ParDo.named("FormatForDatastore").of(new FormatForDatastore(options.getKind( - .apply(DatastoreIO.writeTo(options.getProject())); + .apply(DatastoreIO.writeTo(MoreObjects.firstNonNull( + options.getOutputDataset(), options.getProject(; } if (options.getOutputToBigQuery()) { dataflowUtils.setupBigQueryTable();
[10/10] incubator-beam git commit: This closes #69
This closes #69 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f902582 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f902582 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f902582 Branch: refs/heads/master Commit: 2f902582c1a52c1bb95edc647007eb4a83964508 Parents: 8d87ee0 1504ba7 Author: Maximilian MichelsAuthored: Wed Mar 23 19:30:00 2016 +0100 Committer: Maximilian Michels Committed: Wed Mar 23 19:30:00 2016 +0100 -- .../streaming/io/UnboundedSourceWrapper.java| 87 +--- .../flink/streaming/UnboundedSourceITCase.java | 210 +++ 2 files changed, 272 insertions(+), 25 deletions(-) --
[03/10] incubator-beam git commit: Add DisplayData builder API to SDK
Add DisplayData builder API to SDK This allows generating the display data which will be attached to PTransforms. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5ecb7aa7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5ecb7aa7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5ecb7aa7 Branch: refs/heads/master Commit: 5ecb7aa7a8ac107e2bdb8518da2bee714ceba122 Parents: cb5d6c2 Author: Scott WegnerAuthored: Thu Mar 17 10:22:42 2016 -0700 Committer: Maximilian Michels Committed: Wed Mar 23 19:27:51 2016 +0100 -- sdk/pom.xml | 7 + .../cloud/dataflow/sdk/transforms/DoFn.java | 13 +- .../dataflow/sdk/transforms/PTransform.java | 14 +- .../cloud/dataflow/sdk/transforms/ParDo.java| 13 + .../sdk/transforms/display/DisplayData.java | 517 +++ .../sdk/transforms/display/HasDisplayData.java | 53 ++ .../cloud/dataflow/sdk/transforms/DoFnTest.java | 15 + .../dataflow/sdk/transforms/PTransformTest.java | 41 ++ .../dataflow/sdk/transforms/ParDoTest.java | 23 + .../transforms/display/DisplayDataMatchers.java | 98 +++ .../display/DisplayDataMatchersTest.java| 81 +++ .../sdk/transforms/display/DisplayDataTest.java | 633 +++ .../cloud/dataflow/sdk/util/ApiSurfaceTest.java | 3 +- 13 files changed, 1508 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ecb7aa7/sdk/pom.xml -- diff --git a/sdk/pom.xml b/sdk/pom.xml index 71f5097..185abc2 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -615,6 +615,13 @@ + com.google.guava + guava-testlib + ${guava.version} + test + + + com.google.protobuf protobuf-java ${protobuf.version} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ecb7aa7/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java -- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java index af06cc8..5ba9992 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java @@ -24,6 +24,8 @@ import com.google.cloud.dataflow.sdk.annotations.Experimental; import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; +import com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; import com.google.cloud.dataflow.sdk.util.WindowingInternals; @@ -69,7 +71,7 @@ import java.util.UUID; * @param the type of the (main) input elements * @param the type of the (main) output elements */ -public abstract class DoFn implements Serializable { +public abstract class DoFn implements Serializable, HasDisplayData { /** * Information accessible to all methods in this {@code DoFn}. @@ -366,6 +368,15 @@ public abstract class DoFn implements Serializable { public void finishBundle(Context c) throws Exception { } + /** + * {@inheritDoc} + * + * By default, does not register any display data. Implementors may override this method + * to provide their own display metadata. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) { + } / http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ecb7aa7/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java -- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java index 8a74509..d4496b8 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java @@ -19,6 +19,8 @@ package com.google.cloud.dataflow.sdk.transforms; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; import com.google.cloud.dataflow.sdk.coders.Coder;
[02/10] incubator-beam git commit: Implement InProcessPipelineRunner#run
Implement InProcessPipelineRunner#run Appropriately construct an evaluation context and executor, and start the pipeline when run is called. Implement InProcessPipelineResult. Apply PTransform overrides. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/158f9f8d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/158f9f8d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/158f9f8d Branch: refs/heads/master Commit: 158f9f8d41c63f5a002c6187f4f05f169579dd6d Parents: 5ecb7aa Author: Thomas GrohAuthored: Fri Feb 26 17:30:13 2016 -0800 Committer: Maximilian Michels Committed: Wed Mar 23 19:27:51 2016 +0100 -- .../CachedThreadPoolExecutorServiceFactory.java | 42 .../ConsumerTrackingPipelineVisitor.java| 173 ++ .../inprocess/ExecutorServiceFactory.java | 32 +++ .../ExecutorServiceParallelExecutor.java| 2 +- .../inprocess/GroupByKeyEvaluatorFactory.java | 4 +- .../inprocess/InProcessPipelineOptions.java | 56 + .../inprocess/InProcessPipelineRunner.java | 228 +++--- .../inprocess/KeyedPValueTrackingVisitor.java | 95 .../ConsumerTrackingPipelineVisitorTest.java| 233 +++ .../inprocess/InProcessPipelineRunnerTest.java | 77 ++ .../KeyedPValueTrackingVisitorTest.java | 189 +++ 11 files changed, 1101 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/158f9f8d/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java -- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java new file mode 100644 index 000..3350d2b --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.options.DefaultValueFactory; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * A {@link ExecutorServiceFactory} that produces cached thread pools via + * {@link Executors#newCachedThreadPool()}. + */ +class CachedThreadPoolExecutorServiceFactory +implements DefaultValueFactory, ExecutorServiceFactory { + private static final CachedThreadPoolExecutorServiceFactory INSTANCE = + new CachedThreadPoolExecutorServiceFactory(); + + @Override + public ExecutorServiceFactory create(PipelineOptions options) { +return INSTANCE; + } + + @Override + public ExecutorService create() { +return Executors.newCachedThreadPool(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/158f9f8d/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java -- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java new file mode 100644 index 000..c602b23 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java @@ -0,0 +1,173 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + *
[01/10] incubator-beam git commit: make BigtableIO#Read#withRowFilter public
Repository: incubator-beam Updated Branches: refs/heads/master 8d87ee02b -> 2f902582c make BigtableIO#Read#withRowFilter public Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e39b5d9a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e39b5d9a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e39b5d9a Branch: refs/heads/master Commit: e39b5d9afec2bfa64552a63983eaac1df5da6a8d Parents: 834d071 Author: Neville LiAuthored: Wed Mar 23 06:05:51 2016 +0800 Committer: Maximilian Michels Committed: Wed Mar 23 19:27:51 2016 +0100 -- .../java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e39b5d9a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java -- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java index 81a85fa..7d59b09 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java @@ -211,7 +211,7 @@ public class BigtableIO { * * Does not modify this object. */ -Read withRowFilter(RowFilter filter) { +public Read withRowFilter(RowFilter filter) { checkNotNull(filter, "filter"); return new Read(options, tableId, filter, bigtableService); }
[09/10] incubator-beam git commit: [BEAM-143] [flink] add test for UnboundedSourceWrapper
[BEAM-143] [flink] add test for UnboundedSourceWrapper The test ensures serialization and execution of the wrapper works as expected. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1504ba7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1504ba7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1504ba7b Branch: refs/heads/master Commit: 1504ba7b771173e8ad1cb8b1714d38f2410ef706 Parents: ac5a1e8 Author: Maximilian MichelsAuthored: Wed Mar 23 16:19:27 2016 +0100 Committer: Maximilian Michels Committed: Wed Mar 23 19:29:14 2016 +0100 -- .../flink/streaming/UnboundedSourceITCase.java | 210 +++ 1 file changed, 210 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1504ba7b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java -- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java new file mode 100644 index 000..f36028e --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import org.apache.beam.runners.flink.FlinkTestPipeline; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.joda.time.Instant; +import org.junit.internal.ArrayComparisonFailure; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; +import java.util.NoSuchElementException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + + +public class UnboundedSourceITCase extends StreamingProgramTestBase { + + protected static String resultPath; + + public UnboundedSourceITCase() { + } + + static final String[] EXPECTED_RESULT = new String[]{ + "1", "2", "3", "4", "5", "6", "7", "8", "9"}; + + @Override + protected void preSubmit() throws Exception { +resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { +compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { +runProgram(resultPath); + } + + private static void runProgram(String resultPath) { + +Pipeline p = FlinkTestPipeline.createForStreaming(); + +PCollection result = p +.apply(Read.from(new RangeReadSource(1, 10))) +.apply(Window.into(new GlobalWindows()) +
[04/10] incubator-beam git commit: [flink] Add FlinkRunnerRegistrar
[flink] Add FlinkRunnerRegistrar Expose Flink runner and options via AuteService. AuteService will at compile time populate META-INF/services so that Dataflow sdk can seamlessly pick up FlinkRunner. This closes #40. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e94e7d6a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e94e7d6a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e94e7d6a Branch: refs/heads/master Commit: e94e7d6a5c227eec79d696acd963a88286a487a9 Parents: a20e0b6 Author: Rafal WojdylaAuthored: Fri Mar 11 13:39:55 2016 -0500 Committer: Maximilian Michels Committed: Wed Mar 23 19:27:51 2016 +0100 -- runners/flink/runner/pom.xml| 12 +++-- .../runners/flink/FlinkRunnerRegistrar.java | 56 2 files changed, 63 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e94e7d6a/runners/flink/runner/pom.xml -- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index 212b973..ff4b368 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -84,11 +84,6 @@ - - com.google.auto.service - auto-service - 1.0-rc2 - com.google.cloud.dataflow @@ -121,6 +116,13 @@ 1.9.5 test + + + com.google.auto.service + auto-service + 1.0-rc2 + true + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e94e7d6a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java new file mode 100644 index 000..3e30ab9 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink; + +import com.google.auto.service.AutoService; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar; +import com.google.cloud.dataflow.sdk.runners.PipelineRunner; +import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar; +import com.google.common.collect.ImmutableList; + + +/** + * AuteService registrar - will register FlinkRunner and FlinkOptions + * as possible pipeline runner services. + * + * It ends up in META-INF/services and gets picked up by Dataflow. + * + */ +public class FlinkRunnerRegistrar { + private FlinkRunnerRegistrar() { } + + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { +@Override +public Iterable > getPipelineRunners() { + return ImmutableList. >of(FlinkPipelineRunner.class); +} + } + + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { +@Override +public Iterable getPipelineOptions() { + return ImmutableList. of(FlinkPipelineOptions.class); +} + } +}
[06/10] incubator-beam git commit: [BEAM-116] change runners artifactId to runners-parent
[BEAM-116] change runners artifactId to runners-parent Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a20e0b64 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a20e0b64 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a20e0b64 Branch: refs/heads/master Commit: a20e0b6405b1741f7b5e59bd433a0ab5b038a6b3 Parents: 91de072 Author: Maximilian MichelsAuthored: Wed Mar 16 12:03:07 2016 +0100 Committer: Maximilian Michels Committed: Wed Mar 23 19:27:51 2016 +0100 -- runners/flink/pom.xml | 2 +- runners/pom.xml | 2 +- runners/spark/pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a20e0b64/runners/flink/pom.xml -- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index a340107..31713cd 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -24,7 +24,7 @@ org.apache.beam -runners +runners-parent 1.6.0-SNAPSHOT http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a20e0b64/runners/pom.xml -- diff --git a/runners/pom.xml b/runners/pom.xml index fbd6c41..b2e9eb1 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -29,7 +29,7 @@ org.apache.beam - runners + runners-parent 1.6.0-SNAPSHOT pom http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a20e0b64/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 6dcc95d..9d653a0 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -16,7 +16,7 @@ License. org.apache.beam -runners +runners-parent 1.6.0-SNAPSHOT
[07/10] incubator-beam git commit: [flink] add test case for Runner registration
[flink] add test case for Runner registration Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cb5d6c2b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cb5d6c2b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cb5d6c2b Branch: refs/heads/master Commit: cb5d6c2bf577c0889adc28c06475b5ec22f027da Parents: e94e7d6 Author: Maximilian MichelsAuthored: Mon Mar 21 11:36:35 2016 +0100 Committer: Maximilian Michels Committed: Wed Mar 23 19:27:51 2016 +0100 -- .../runners/flink/FlinkRunnerRegistrarTest.java | 48 1 file changed, 48 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cb5d6c2b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java -- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java new file mode 100644 index 000..45ca830 --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the proper registration of the Flink runner. + */ +public class FlinkRunnerRegistrarTest { + + @Test + public void testFullName() { +String[] args = +new String[] {String.format("--runner=%s", FlinkPipelineRunner.class.getName())}; +PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); +assertEquals(opts.getRunner(), FlinkPipelineRunner.class); + } + + @Test + public void testClassName() { +String[] args = +new String[] {String.format("--runner=%s", FlinkPipelineRunner.class.getSimpleName())}; +PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); +assertEquals(opts.getRunner(), FlinkPipelineRunner.class); + } + +}
[2/3] incubator-beam git commit: [flink] Add FlinkRunnerRegistrar
[flink] Add FlinkRunnerRegistrar Expose Flink runner and options via AuteService. AuteService will at compile time populate META-INF/services so that Dataflow sdk can seamlessly pick up FlinkRunner. This closes #40. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/086a35e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/086a35e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/086a35e9 Branch: refs/heads/master Commit: 086a35e9d0d63d5ec15f2520146923066995492a Parents: fcc6f3c Author: Rafal WojdylaAuthored: Fri Mar 11 13:39:55 2016 -0500 Committer: Maximilian Michels Committed: Mon Mar 21 11:38:30 2016 +0100 -- runners/flink/runner/pom.xml| 12 +++-- .../runners/flink/FlinkRunnerRegistrar.java | 56 2 files changed, 63 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/086a35e9/runners/flink/runner/pom.xml -- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index 212b973..ff4b368 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -84,11 +84,6 @@ - - com.google.auto.service - auto-service - 1.0-rc2 - com.google.cloud.dataflow @@ -121,6 +116,13 @@ 1.9.5 test + + + com.google.auto.service + auto-service + 1.0-rc2 + true + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/086a35e9/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java new file mode 100644 index 000..3e30ab9 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink; + +import com.google.auto.service.AutoService; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar; +import com.google.cloud.dataflow.sdk.runners.PipelineRunner; +import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar; +import com.google.common.collect.ImmutableList; + + +/** + * AuteService registrar - will register FlinkRunner and FlinkOptions + * as possible pipeline runner services. + * + * It ends up in META-INF/services and gets picked up by Dataflow. + * + */ +public class FlinkRunnerRegistrar { + private FlinkRunnerRegistrar() { } + + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { +@Override +public Iterable > getPipelineRunners() { + return ImmutableList. >of(FlinkPipelineRunner.class); +} + } + + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { +@Override +public Iterable getPipelineOptions() { + return ImmutableList. of(FlinkPipelineOptions.class); +} + } +}
[1/3] incubator-beam git commit: [flink] add test case for Runner registration
Repository: incubator-beam Updated Branches: refs/heads/master fcc6f3cfd -> c984f3ae2 [flink] add test case for Runner registration Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ba7b7a04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ba7b7a04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ba7b7a04 Branch: refs/heads/master Commit: ba7b7a0447417b8c08c0748a5d60b598da7b8e4e Parents: 086a35e Author: Maximilian MichelsAuthored: Mon Mar 21 11:36:35 2016 +0100 Committer: Maximilian Michels Committed: Mon Mar 21 11:38:30 2016 +0100 -- .../runners/flink/FlinkRunnerRegistrarTest.java | 48 1 file changed, 48 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ba7b7a04/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java -- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java new file mode 100644 index 000..45ca830 --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the proper registration of the Flink runner. + */ +public class FlinkRunnerRegistrarTest { + + @Test + public void testFullName() { +String[] args = +new String[] {String.format("--runner=%s", FlinkPipelineRunner.class.getName())}; +PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); +assertEquals(opts.getRunner(), FlinkPipelineRunner.class); + } + + @Test + public void testClassName() { +String[] args = +new String[] {String.format("--runner=%s", FlinkPipelineRunner.class.getSimpleName())}; +PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); +assertEquals(opts.getRunner(), FlinkPipelineRunner.class); + } + +}
[1/2] incubator-beam git commit: [BEAM-116] change runners artifactId to runners-parent
Repository: incubator-beam Updated Branches: refs/heads/master f7aaee2ea -> fcc6f3cfd [BEAM-116] change runners artifactId to runners-parent Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/447c8af2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/447c8af2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/447c8af2 Branch: refs/heads/master Commit: 447c8af23b3fa727bba5aa97093699745d6e6f5b Parents: 5b5c0e2 Author: Maximilian MichelsAuthored: Wed Mar 16 12:03:07 2016 +0100 Committer: Maximilian Michels Committed: Fri Mar 18 12:05:07 2016 +0100 -- runners/flink/pom.xml | 2 +- runners/pom.xml | 2 +- runners/spark/pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/447c8af2/runners/flink/pom.xml -- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index a340107..31713cd 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -24,7 +24,7 @@ org.apache.beam -runners +runners-parent 1.6.0-SNAPSHOT http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/447c8af2/runners/pom.xml -- diff --git a/runners/pom.xml b/runners/pom.xml index fbd6c41..b2e9eb1 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -29,7 +29,7 @@ org.apache.beam - runners + runners-parent 1.6.0-SNAPSHOT pom http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/447c8af2/runners/spark/pom.xml -- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 6dcc95d..9d653a0 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -16,7 +16,7 @@ License. org.apache.beam -runners +runners-parent 1.6.0-SNAPSHOT
[2/2] incubator-beam git commit: [BEAM-116] This closes #59
[BEAM-116] This closes #59 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fcc6f3cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fcc6f3cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fcc6f3cf Branch: refs/heads/master Commit: fcc6f3cfd111ab222063e414579c966916299060 Parents: f7aaee2 447c8af Author: Maximilian MichelsAuthored: Sun Mar 20 19:34:22 2016 +0100 Committer: Maximilian Michels Committed: Sun Mar 20 19:48:25 2016 +0100 -- runners/flink/pom.xml | 2 +- runners/pom.xml | 2 +- runners/spark/pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) --
[GitHub] incubator-beam pull request: [BEAM-116] change runners artifactId ...
GitHub user mxm opened a pull request: https://github.com/apache/incubator-beam/pull/59 [BEAM-116] change runners artifactId to runners-parent You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/incubator-beam BEAM-116 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/59.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #59 commit 447c8af23b3fa727bba5aa97093699745d6e6f5b Author: Maximilian Michels <m...@apache.org> Date: 2016-03-16T11:03:07Z [BEAM-116] change runners artifactId to runners-parent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
incubator-beam git commit: [flink] improvements to the Kafka Example
Repository: incubator-beam Updated Branches: refs/heads/master 0f137169e -> ef1e32dee [flink] improvements to the Kafka Example - use timestamp extractor after ingestion - fix coder runtime exception - correct logging Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef1e32de Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef1e32de Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef1e32de Branch: refs/heads/master Commit: ef1e32deefb9886584556c7125e87b2873c63ebf Parents: 0f13716 Author: Maximilian MichelsAuthored: Thu Mar 17 14:49:09 2016 +0100 Committer: Maximilian Michels Committed: Thu Mar 17 14:53:41 2016 +0100 -- .../examples/streaming/KafkaWindowedWordCountExample.java | 2 +- .../flink/translation/FlinkStreamingTransformTranslators.java | 3 ++- .../wrappers/streaming/io/UnboundedFlinkSource.java | 7 +++ runners/flink/runner/src/main/resources/log4j.properties | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java -- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java index 3942d0d..8fca1d4 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java @@ -103,7 +103,7 @@ public class KafkaWindowedWordCountExample { public static void main(String[] args) { PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class); KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class); -options.setJobName("KafkaExample"); +options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds"); options.setStreaming(true); options.setCheckpointingInterval(1000L); options.setNumberOfExecutionRetries(5); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index bdefeaf..2b9b0ee 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -44,6 +44,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.*; +import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; import org.apache.flink.util.Collector; import org.joda.time.Instant; import org.slf4j.Logger; @@ -179,7 +180,7 @@ public class FlinkStreamingTransformTranslators { public void flatMap(String s, Collector collector) throws Exception { collector.collect(WindowedValue.of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); } -}); +}).assignTimestampsAndWatermarks(new IngestionTimeExtractor()); } else { source = context.getExecutionEnvironment() .addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java index 05a8956..82984cb 100644 ---
incubator-beam git commit: [flink] fix UnboundedFlinkSource wrapper
Repository: incubator-beam Updated Branches: refs/heads/master a9c46057e -> 0f137169e [flink] fix UnboundedFlinkSource wrapper - remove unnecessary PipelineOptions cache - use the correct interface types - improve Kafka example Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f137169 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f137169 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f137169 Branch: refs/heads/master Commit: 0f137169e4c2cd8d3e5a86c91bc2f401d276e8ed Parents: a9c4605 Author: Maximilian MichelsAuthored: Thu Mar 17 12:26:03 2016 +0100 Committer: Maximilian Michels Committed: Thu Mar 17 12:27:37 2016 +0100 -- .../KafkaWindowedWordCountExample.java | 7 ++-- .../streaming/io/UnboundedFlinkSource.java | 37 ++-- 2 files changed, 22 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f137169/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java -- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java index 55cdc22..3942d0d 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java @@ -22,7 +22,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.io.Read; import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.io.UnboundedSource; import com.google.cloud.dataflow.sdk.options.Default; import com.google.cloud.dataflow.sdk.options.Description; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; @@ -30,7 +29,7 @@ import com.google.cloud.dataflow.sdk.transforms.*; import com.google.cloud.dataflow.sdk.transforms.windowing.*; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.joda.time.Duration; @@ -121,12 +120,12 @@ public class KafkaWindowedWordCountExample { // this is the Flink consumer that reads the input to // the program from a kafka topic. -FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>( +FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>( options.getKafkaTopic(), new SimpleStringSchema(), p); PCollection words = pipeline -.apply(Read.from(new UnboundedFlinkSource (options, kafkaConsumer)).named("StreamingWordCount")) + .apply(Read.named("StreamingWordCount").from(UnboundedFlinkSource.of(kafkaConsumer))) .apply(ParDo.of(new ExtractWordsFn())) .apply(Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize( .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f137169/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java index 2857efd..05a8956 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -22,7 +22,9 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.io.UnboundedSource; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.common.base.Preconditions; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import
incubator-beam git commit: [BEAM-126] remove strict job name check
Repository: incubator-beam Updated Branches: refs/heads/master 5b5c0e28f -> 81d5ff5a5 [BEAM-126] remove strict job name check Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81d5ff5a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81d5ff5a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81d5ff5a Branch: refs/heads/master Commit: 81d5ff5a561ebcf323caea5bdc4363353e5e60dd Parents: 5b5c0e2 Author: Maximilian MichelsAuthored: Fri Mar 18 15:46:36 2016 +0100 Committer: Maximilian Michels Committed: Fri Mar 18 16:01:48 2016 +0100 -- .../runners/flink/FlinkPipelineExecutionEnvironment.java | 4 ++-- .../org/apache/beam/runners/flink/FlinkPipelineRunner.java | 8 2 files changed, 2 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81d5ff5a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 8825ed3..6f93478 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -141,7 +141,7 @@ public class FlinkPipelineExecutionEnvironment { if (this.flinkPipelineTranslator == null) { throw new RuntimeException("FlinkPipelineTranslator not initialized."); } - return this.flinkStreamEnv.execute(); + return this.flinkStreamEnv.execute(options.getJobName()); } else { if (this.flinkBatchEnv == null) { throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); @@ -149,7 +149,7 @@ public class FlinkPipelineExecutionEnvironment { if (this.flinkPipelineTranslator == null) { throw new RuntimeException("FlinkPipelineTranslator not initialized."); } - return this.flinkBatchEnv.execute(); + return this.flinkBatchEnv.execute(options.getJobName()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81d5ff5a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java index fe773d9..4f53e35 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java @@ -87,14 +87,6 @@ public class FlinkPipelineRunner extends PipelineRunner { LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage()); } -// Verify jobName according to service requirements. -String jobName = flinkOptions.getJobName().toLowerCase(); -Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; " + -"the name must consist of only the characters " + "[-a-z0-9], starting with a letter " + -"and ending with a letter " + "or number"); -Preconditions.checkArgument(jobName.length() <= 40, -"JobName too long; must be no more than 40 characters in length"); - // Set Flink Master to [auto] if no option was specified. if (flinkOptions.getFlinkMaster() == null) { flinkOptions.setFlinkMaster("[auto]");
[09/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java new file mode 100644 index 000..10c8bbf --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.common.base.Preconditions; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.util.*; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.common.base.Throwables; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; +import org.joda.time.format.PeriodFormat; + +import java.util.Collection; + +/** + * An abstract class that encapsulates the common code of the the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound} + * and {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} wrappers. See the {@link FlinkParDoBoundWrapper} and + * {@link FlinkParDoBoundMultiWrapper} for the actual wrappers of the aforementioned transformations. + * */ +public abstract class FlinkAbstractParDoWrapperextends RichFlatMapFunction { + + private final DoFn doFn; + private final WindowingStrategy windowingStrategy; + private transient PipelineOptions options; + + private DoFnProcessContext context; + + public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy windowingStrategy, DoFn doFn) { +Preconditions.checkNotNull(options); +Preconditions.checkNotNull(windowingStrategy); +Preconditions.checkNotNull(doFn); + +this.doFn = doFn; +this.options = options; +this.windowingStrategy = windowingStrategy; + } + + private void initContext(DoFn function, Collector outCollector) { +if (this.context == null) { + this.context = new DoFnProcessContext(function, outCollector); +} + } + + @Override + public void flatMap(WindowedValue value, Collector out) throws Exception { +this.initContext(doFn, out); + +// for each window the element belongs to, create a new copy here. +Collection windows = value.getWindows(); +if (windows.size() <= 1) { + processElement(value); +} else { + for (BoundedWindow window : windows) { +processElement(WindowedValue.of( +value.getValue(), value.getTimestamp(), window, value.getPane())); + } +} + } + + private void processElement(WindowedValue value) throws Exception { +this.context.setElement(value); +this.doFn.startBundle(context); +doFn.processElement(context); +this.doFn.finishBundle(context); + } + + private class DoFnProcessContext extends DoFn .ProcessContext { + +private final DoFn fn; + +protected final Collector collector; + +private
[03/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java deleted file mode 100644 index cd5cd40..000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.io.BoundedSource; -import com.google.cloud.dataflow.sdk.io.Source; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.io.InputSplitAssigner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.List; - -/** - * A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a - * Dataflow {@link com.google.cloud.dataflow.sdk.io.Source}. - */ -public class SourceInputFormat implements InputFormat{ - private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); - - private final BoundedSource initialSource; - private transient PipelineOptions options; - - private BoundedSource.BoundedReader reader = null; - private boolean reachedEnd = true; - - public SourceInputFormat(BoundedSource initialSource, PipelineOptions options) { -this.initialSource = initialSource; -this.options = options; - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { -out.defaultWriteObject(); -ObjectMapper mapper = new ObjectMapper(); -mapper.writeValue(out, options); - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { -in.defaultReadObject(); -ObjectMapper mapper = new ObjectMapper(); -options = mapper.readValue(in, PipelineOptions.class); - } - - @Override - public void configure(Configuration configuration) {} - - @Override - public void open(SourceInputSplit sourceInputSplit) throws IOException { -reader = ((BoundedSource) sourceInputSplit.getSource()).createReader(options); -reachedEnd = false; - } - - @Override - public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException { -try { - final long estimatedSize = initialSource.getEstimatedSizeBytes(options); - - return new BaseStatistics() { -@Override -public long getTotalInputSize() { - return estimatedSize; - -} - -@Override -public long getNumberOfRecords() { - return BaseStatistics.NUM_RECORDS_UNKNOWN; -} - -@Override -public float getAverageRecordWidth() { - return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN; -} - }; -} catch (Exception e) { - LOG.warn("Could not read Source statistics: {}", e); -} - -return null; - } - - @Override - @SuppressWarnings("unchecked") - public SourceInputSplit[] createInputSplits(int numSplits) throws IOException { -long desiredSizeBytes; -try { - desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; - List> shards = initialSource.splitIntoBundles(desiredSizeBytes, - options); - List splits = new ArrayList<>(); - int splitCount = 0; - for (Source shard: shards) { -splits.add(new SourceInputSplit<>(shard, splitCount++)); -
[07/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java -- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java new file mode 100644 index 000..e73c456 --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.common.base.Joiner; +import org.apache.flink.test.util.JavaProgramTestBase; + + +public class WordCountJoin2ITCase extends JavaProgramTestBase { + + static final String[] WORDS_1 = new String[] { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + + static final String[] WORDS_2 = new String[] { + "hi tim", "beauty", "hooray sue bob", + "hi there", "", "please say hi"}; + + static final String[] RESULTS = new String[] { + "beauty -> Tag1: Tag2: 1", + "bob -> Tag1: 2 Tag2: 1", + "hi -> Tag1: 5 Tag2: 3", + "hooray -> Tag1: Tag2: 1", + "please -> Tag1: Tag2: 1", + "say -> Tag1: Tag2: 1", + "sue -> Tag1: 2 Tag2: 1", + "there -> Tag1: 1 Tag2: 1", + "tim -> Tag1: Tag2: 1" + }; + + static final TupleTag tag1 = new TupleTag<>("Tag1"); + static final TupleTag tag2 = new TupleTag<>("Tag2"); + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { +resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { +compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); + } + + @Override + protected void testProgram() throws Exception { +Pipeline p = FlinkTestPipeline.createForBatch(); + +/* Create two PCollections and join them */ +PCollection> occurences1 = p.apply(Create.of(WORDS_1)) +.apply(ParDo.of(new ExtractWordsFn())) +.apply(Count.perElement()); + +PCollection > occurences2 = p.apply(Create.of(WORDS_2)) +.apply(ParDo.of(new ExtractWordsFn())) +.apply(Count.perElement()); + +/* CoGroup the two collections */ +PCollection > mergedOccurences = KeyedPCollectionTuple +.of(tag1, occurences1) +.and(tag2, occurences2) +.apply(CoGroupByKey.create()); + +/* Format output */ +mergedOccurences.apply(ParDo.of(new FormatCountsFn())) +.apply(TextIO.Write.named("test").to(resultPath)); + +p.run(); + } + + + static class ExtractWordsFn extends DoFn { + +@Override +public void startBundle(Context c) { +} + +@Override +public void processElement(ProcessContext c) { + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { +if (!word.isEmpty()) { + c.output(word); +} + } +} + } + + static class FormatCountsFn extends DoFn , String> { +@Override +public void processElement(ProcessContext c) { + CoGbkResult value = c.element().getValue(); + String key = c.element().getKey(); + String
[10/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java new file mode 100644 index 000..ca667ee --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingInternals; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.common.collect.ImmutableList; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Encapsulates a {@link com.google.cloud.dataflow.sdk.transforms.DoFn} that uses side outputs + * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}. + * + * We get a mapping from {@link com.google.cloud.dataflow.sdk.values.TupleTag} to output index + * and must tag all outputs with the output number. Afterwards a filter will filter out + * those elements that are not to be in a specific output. + */ +public class FlinkMultiOutputDoFnFunctionextends RichMapPartitionFunction { + + private final DoFn doFn; + private transient PipelineOptions options; + private final Map outputMap; + + public FlinkMultiOutputDoFnFunction(DoFn doFn, PipelineOptions options, Map outputMap) { +this.doFn = doFn; +this.options = options; +this.outputMap = outputMap; + } + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { +out.defaultWriteObject(); +ObjectMapper mapper = new ObjectMapper(); +mapper.writeValue(out, options); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { +in.defaultReadObject(); +ObjectMapper mapper = new ObjectMapper(); +options = mapper.readValue(in, PipelineOptions.class); + + } + + @Override + public void mapPartition(Iterable values, Collector out) throws Exception { +ProcessContext context = new ProcessContext(doFn, out); +this.doFn.startBundle(context); +for (IN value : values) { + context.inValue = value; + doFn.processElement(context); +} +this.doFn.finishBundle(context); + } + + private class ProcessContext extends DoFn .ProcessContext { + +IN inValue; +Collector outCollector; + +public ProcessContext(DoFn fn, Collector outCollector) { + fn.super(); + this.outCollector = outCollector; +} + +@Override +public IN element() { + return this.inValue; +} + +@Override +public Instant
[05/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java deleted file mode 100644 index 71e3b54..000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.io; - -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PDone; - -/** - * Transform for printing the contents of a {@link com.google.cloud.dataflow.sdk.values.PCollection}. - * to standard output. - * - * This is Flink-specific and will only work when executed using the - * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}. - */ -public class ConsoleIO { - - /** - * A PTransform that writes a PCollection to a standard output. - */ - public static class Write { - -/** - * Returns a ConsoleIO.Write PTransform with a default step name. - */ -public static Bound create() { - return new Bound(); -} - -/** - * Returns a ConsoleIO.Write PTransform with the given step name. - */ -public static Bound named(String name) { - return new Bound().named(name); -} - -/** - * A PTransform that writes a bounded PCollection to standard output. - */ -public static class Bound extends PTransform{ - private static final long serialVersionUID = 0; - - Bound() { -super("ConsoleIO.Write"); - } - - Bound(String name) { -super(name); - } - - /** - * Returns a new ConsoleIO.Write PTransform that's like this one but with the given - * step - * name. Does not modify this object. - */ - public Bound named(String name) { -return new Bound(name); - } - - @Override - public PDone apply(PCollection input) { -return PDone.in(input.getPipeline()); - } -} - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java deleted file mode 100644 index 28a10b7..000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation; - -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; -import
[12/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/pom.xml -- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index c8c5d84..31713cd 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -1,261 +1,167 @@ http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; -xmlns="http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;> - -4.0.0 - - -org.apache.beam -runners -1.6.0-SNAPSHOT - - -flink-runner -0.3-SNAPSHOT - -Flink Beam Runner -jar - -2015 - - - -The Apache Software License, Version 2.0 -http://www.apache.org/licenses/LICENSE-2.0.txt -repo - - - - -UTF-8 - UTF-8 -1.0.0 -1.6.0-SNAPSHOT -2.10 - -org.apache.beam.runners.flink.examples.WordCount -kinglear.txt -wordcounts.txt -1 - - - - -apache.snapshots -Apache Development Snapshot Repository - https://repository.apache.org/content/repositories/snapshots/ - -false - - -true - - - - - - -org.apache.flink -flink-core -${flink.version} - - -org.apache.flink - flink-streaming-java_${scala.major.version} -${flink.version} - - -org.apache.flink - flink-streaming-java_${scala.major.version} -${flink.version} -test -test-jar - - -org.apache.flink -flink-java -${flink.version} - - -org.apache.flink -flink-clients_${scala.major.version} -${flink.version} - - -org.apache.flink -flink-test-utils_${scala.major.version} -${flink.version} -test - - -org.apache.flink - flink-connector-kafka-0.8_${scala.major.version} -${flink.version} - - -org.apache.flink -flink-avro_${scala.major.version} -${flink.version} - - -com.google.cloud.dataflow -google-cloud-dataflow-java-sdk-all -${beam.version} - - -org.slf4j -slf4j-jdk14 - - - - -org.mockito -mockito-all -1.9.5 -test - - - - - - - - -org.apache.maven.plugins -maven-jar-plugin -2.6 - - - - true - true - - - - - - - - -org.apache.maven.plugins -maven-compiler-plugin -3.1 - -1.7 -1.7 - - - - - -maven-failsafe-plugin -2.17 - - - -integration-test -verify - - - - -1 --Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit - - - - - -org.apache.maven.plugins -maven-surefire-plugin -2.17 - --Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit - - - - - -org.apache.maven.plugins -maven-eclipse-plugin -2.8 - - - org.eclipse.jdt.launching.JRE_CONTAINER - -true -true - - - - - -org.apache.maven.plugins -maven-enforcer-plugin -1.3.1 - - -enforce-maven - -enforce - - - - -[1.7,) - -