[GitHub] incubator-beam pull request #1283: [BEAM-896] adjust ReadSourceITCase to exc...

2016-11-04 Thread mxm
Github user mxm closed the pull request at:

https://github.com/apache/incubator-beam/pull/1283


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1283: [BEAM-896] adjust ReadSourceITCase to exc...

2016-11-04 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/1283

[BEAM-896] adjust ReadSourceITCase to exclude Beam temporary files

This should fix the test failures in `ReadSourceITCase` caused by #1050.

@dhalperi Wouldn't it be nice to have the temporary folder prefix somewhere 
accessible as a static variable?



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-896

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1283.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1283


commit eb1fcb35b556ceb099324eb956716f31b5badc61
Author: Maximilian Michels <m...@apache.org>
Date:   2016-11-04T09:50:18Z

[BEAM-896] adjust ReadSourceITCase to exclude Beam temporary files




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #1093

2016-10-18 Thread mxm
This closes #1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b5ff4c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b5ff4c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b5ff4c4

Branch: refs/heads/master
Commit: 6b5ff4c4aad5e4d1419b1a147153b0f8d72324ae
Parents: a2c342c 76434df
Author: Maximilian Michels 
Authored: Tue Oct 18 16:59:58 2016 +0200
Committer: Maximilian Michels 
Committed: Tue Oct 18 16:59:58 2016 +0200

--
 .../flink/FlinkDetachedRunnerResult.java| 76 
 .../apache/beam/runners/flink/FlinkRunner.java  |  9 ++-
 .../beam/runners/flink/FlinkRunnerResult.java   | 11 +--
 .../beam/runners/flink/TestFlinkRunner.java |  9 ++-
 4 files changed, 91 insertions(+), 14 deletions(-)
--




[1/2] incubator-beam git commit: [BEAM-593] avoid throwing Exception in waitUntilFinish

2016-10-18 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master a2c342cfd -> 6b5ff4c4a


[BEAM-593] avoid throwing Exception in waitUntilFinish

The current implementation of Flink's PipelineResult assumes that the
pipeline has already been processed. Hence, we can return State.Done
when wailUntilFinished is called.

Additionally, we introduce a PipelineResult for detached execution which
returns State.UNKNOWN for now.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/76434dff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/76434dff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/76434dff

Branch: refs/heads/master
Commit: 76434dff650196c74afdeac917d8ceddb2550079
Parents: a2c342c
Author: Maximilian Michels 
Authored: Thu Oct 13 14:01:06 2016 +0200
Committer: Maximilian Michels 
Committed: Tue Oct 18 16:59:49 2016 +0200

--
 .../flink/FlinkDetachedRunnerResult.java| 76 
 .../apache/beam/runners/flink/FlinkRunner.java  |  9 ++-
 .../beam/runners/flink/FlinkRunnerResult.java   | 11 +--
 .../beam/runners/flink/TestFlinkRunner.java |  9 ++-
 4 files changed, 91 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76434dff/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
new file mode 100644
index 000..6adcf07
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.commons.lang.NotImplementedException;
+import org.joda.time.Duration;
+
+
+/**
+ * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} 
with Flink.
+ * In detached execution, results and job execution are currently unavailable.
+ */
+public class FlinkDetachedRunnerResult implements PipelineResult {
+
+  FlinkDetachedRunnerResult() {}
+
+  @Override
+  public State getState() {
+return State.UNKNOWN;
+  }
+
+  @Override
+  public  AggregatorValues getAggregatorValues(final Aggregator 
aggregator)
+  throws AggregatorRetrievalException {
+throw new AggregatorRetrievalException(
+"Accumulators can't be retrieved for detached Job executions.",
+new NotImplementedException());
+  }
+
+  @Override
+  public MetricResults metrics() {
+throw new UnsupportedOperationException("The FlinkRunner does not 
currently support metrics.");
+  }
+
+  @Override
+  public State cancel() throws IOException {
+throw new UnsupportedOperationException("Cancelling is not yet 
supported.");
+  }
+
+  @Override
+  public State waitUntilFinish() {
+return State.UNKNOWN;
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+return State.UNKNOWN;
+  }
+
+  @Override
+  public String toString() {
+return "FlinkDetachedRunnerResult{}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76434dff/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 932952d..12e21c7 100644
--- 

[GitHub] incubator-beam pull request #1093: [BEAM-593] avoid throwing Exception in wa...

2016-10-13 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/1093

[BEAM-593] avoid throwing Exception in waitUntilFinish

The current implementation of Flink's `PipelineResult` assumes that the 
pipeline has already been processed. Hence, we can return State.Done when 
`wailUntilFinish()` is called.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-593

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1093.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1093


commit c4b78841a82951b5f7c3e1c7763e078317bb3a2f
Author: Maximilian Michels <m...@apache.org>
Date:   2016-10-13T12:01:06Z

[BEAM-593] avoid throwing Exception in waitUntilFinish

The current implementation of Flink's PipelineResult assumes that the
pipeline has already been processed. Hence, we can return State.Done
when wailUntilFinished is called.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Fix inconsistent in formatting logs: leaveCompositeTransform always decrement depth, but enterCompositeTransform increment depth only on ENTER_TRANSFORM

2016-10-13 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master 13b45895e -> 73226168a


Fix inconsistent in formatting logs: leaveCompositeTransform always decrement 
depth, but enterCompositeTransform increment depth only on ENTER_TRANSFORM


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cea201ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cea201ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cea201ea

Branch: refs/heads/master
Commit: cea201eaaea24d8cc1e117645d1c81f379beeb41
Parents: 98da6e8
Author: Alexey Diomin 
Authored: Wed Aug 31 18:17:01 2016 +0400
Committer: Alexey Diomin 
Committed: Wed Aug 31 18:17:54 2016 +0400

--
 .../runners/flink/translation/FlinkBatchPipelineTranslator.java| 2 +-
 .../flink/translation/FlinkStreamingPipelineTranslator.java| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cea201ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 66c48b0..1cb604f 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -65,6 +65,7 @@ public class FlinkBatchPipelineTranslator extends 
FlinkPipelineTranslator {
   @Override
   public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
 LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + 
formatNodeName(node));
+this.depth++;
 
 BatchTransformTranslator translator = getTranslator(node);
 
@@ -73,7 +74,6 @@ public class FlinkBatchPipelineTranslator extends 
FlinkPipelineTranslator {
   LOG.info(genSpaces(this.depth) + "translated-" + formatNodeName(node));
   return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
 } else {
-  this.depth++;
   return CompositeBehavior.ENTER_TRANSFORM;
 }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cea201ea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
index 284cd23..e5c0d76 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
@@ -52,6 +52,7 @@ public class FlinkStreamingPipelineTranslator extends 
FlinkPipelineTranslator {
   @Override
   public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
 LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + 
formatNodeName(node));
+this.depth++;
 
 PTransform transform = node.getTransform();
 if (transform != null) {
@@ -64,7 +65,6 @@ public class FlinkStreamingPipelineTranslator extends 
FlinkPipelineTranslator {
 return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
   }
 }
-this.depth++;
 return CompositeBehavior.ENTER_TRANSFORM;
   }
 



[2/2] incubator-beam git commit: This closes #908

2016-10-13 Thread mxm
This closes #908


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/73226168
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/73226168
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/73226168

Branch: refs/heads/master
Commit: 73226168a436f88bf650e36b45434c2dbe399ae2
Parents: 13b4589 cea201e
Author: Maximilian Michels 
Authored: Thu Oct 13 10:43:30 2016 +0200
Committer: Maximilian Michels 
Committed: Thu Oct 13 10:43:30 2016 +0200

--
 .../runners/flink/translation/FlinkBatchPipelineTranslator.java| 2 +-
 .../flink/translation/FlinkStreamingPipelineTranslator.java| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/73226168/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
--



[2/2] incubator-beam git commit: This closes #1021

2016-09-28 Thread mxm
This closes #1021


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1ac

Branch: refs/heads/master
Commit: a1acdca13f6902faad30f37a877c1e6fb218
Parents: b5853a6 59f6231
Author: Maximilian Michels 
Authored: Wed Sep 28 18:46:58 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Sep 28 18:46:58 2016 +0200

--
 .../wrappers/streaming/WindowDoFnOperator.java  | 179 +--
 1 file changed, 165 insertions(+), 14 deletions(-)
--




[1/2] incubator-beam git commit: [BEAM-615] Add Support for Processing-Time Timers in FlinkRunner Window Operator

2016-09-28 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master b5853a624 -> a1acd


[BEAM-615] Add Support for Processing-Time Timers in FlinkRunner Window Operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59f62318
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59f62318
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59f62318

Branch: refs/heads/master
Commit: 59f623189184b225723ebd5686d912aa296ce35b
Parents: 3879db0
Author: Aljoscha Krettek 
Authored: Wed Sep 28 11:49:54 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Sep 28 18:46:30 2016 +0200

--
 .../wrappers/streaming/WindowDoFnOperator.java  | 179 +--
 1 file changed, 165 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59f62318/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 14a3ca7..e06a783 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -26,12 +29,14 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.core.SystemReduceFn;
@@ -53,12 +58,15 @@ import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.joda.time.Instant;
 
@@ -69,7 +77,8 @@ import org.joda.time.Instant;
  * @param 
  */
 public class WindowDoFnOperator
-extends DoFnOperator, KV, 
WindowedValue>> {
+extends DoFnOperator, KV, 
WindowedValue>>
+implements Triggerable {
 
   private final Coder keyCoder;
   private final TimerInternals.TimerDataCoder timerCoder;
@@ -77,6 +86,11 @@ public class WindowDoFnOperator
   private transient Set> 
watermarkTimers;
   private transient Queue> 
watermarkTimersQueue;
 
+  private transient Queue> 
processingTimeTimersQueue;
+  private transient Set> 
processingTimeTimers;
+  private transient Multiset processingTimeTimerTimestamps;
+  private transient Map processingTimeTimerFutures;
+
   private FlinkStateInternals stateInternals;
 
   private final SystemReduceFn 
systemReduceFn;
@@ -151,6 +165,24 @@ public class WindowDoFnOperator
   });
 }
 
+if (processingTimeTimers == null) {
+  processingTimeTimers = new HashSet<>();
+  processingTimeTimerTimestamps = HashMultiset.create();
+  processingTimeTimersQueue = new PriorityQueue<>(
+  10,
+  new Comparator>() {
+@Override
+public int compare(
+

[3/3] incubator-beam git commit: This closes #967

2016-09-28 Thread mxm
This closes #967


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3879db03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3879db03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3879db03

Branch: refs/heads/master
Commit: 3879db03657dd9331977313d5f3ab30d5f163b60
Parents: db47c63 f3f2a97
Author: Maximilian Michels 
Authored: Tue Sep 27 11:15:55 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Sep 28 11:16:37 2016 +0200

--
 .../streaming/io/UnboundedSourceWrapper.java| 96 
 .../streaming/UnboundedSourceWrapperTest.java   | 10 ++
 2 files changed, 88 insertions(+), 18 deletions(-)
--




[2/3] incubator-beam git commit: fix potential NPE in checkpointing of UnboundedSourceWrapper

2016-09-28 Thread mxm
fix potential NPE in checkpointing of UnboundedSourceWrapper

This moves all the initialization code to the open() method which ensures
that no snapshot can occur before the state has been initialized correctly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f3f2a977
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f3f2a977
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f3f2a977

Branch: refs/heads/master
Commit: f3f2a9779a5c355a5902a783f3e72609ff71717f
Parents: cf14e80
Author: Maximilian Michels 
Authored: Fri Sep 16 18:42:43 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Sep 28 11:14:21 2016 +0200

--
 .../streaming/io/UnboundedSourceWrapper.java| 39 
 .../streaming/UnboundedSourceWrapperTest.java   |  3 ++
 2 files changed, 27 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3f2a977/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 64cf703..68a83e8 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -91,6 +91,7 @@ public class UnboundedSourceWrapper<
   private transient List 
localReaders;
 
   /**
+   * Flag to indicate whether the source is running.
* Initialize here and not in run() to prevent races where we cancel a job 
before run() is
* ever called or run() is called after cancel().
*/
@@ -154,19 +155,17 @@ public class UnboundedSourceWrapper<
 splitSources = source.generateInitialSplits(parallelism, pipelineOptions);
   }
 
-  @Override
-  public void run(SourceContext ctx) throws Exception {
-if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
-  throw new RuntimeException(
-  "Cannot emit watermarks, this hints at a misconfiguration/bug.");
-}
 
-context = (StreamSource.ManualWatermarkContext) 
ctx;
+  /**
+   * Initialize and restore state before starting execution of the source.
+   */
+  @Override
+  public void open(Configuration parameters) throws Exception {
 runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
 
 // figure out which split sources we're responsible for
-int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
+int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
+int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
 
 localSplitSources = new ArrayList<>();
 localReaders = new ArrayList<>();
@@ -183,12 +182,12 @@ public class UnboundedSourceWrapper<
   new Function<
   KV, 
CheckpointMarkT>,
   UnboundedSource>() {
-@Override
-public UnboundedSource apply(
-KV, 
CheckpointMarkT> input) {
-  return input.getKey();
-}
-  });
+@Override
+public UnboundedSource apply(
+KV, 
CheckpointMarkT> input) {
+  return input.getKey();
+}
+  });
 
   for (KV, 
CheckpointMarkT> restored:
   restoredState) {
@@ -215,6 +214,16 @@ public class UnboundedSourceWrapper<
 subtaskIndex,
 numSubtasks,
 localSplitSources);
+  }
+
+  @Override
+  public void run(SourceContext ctx) throws Exception {
+if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
+  throw new RuntimeException(
+  "Cannot emit watermarks, this hints at a misconfiguration/bug.");
+}
+
+context = (StreamSource.ManualWatermarkContext) 
ctx;
 
 if (localReaders.size() == 0) {
   // do nothing, but still look busy ...

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3f2a977/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
--
diff --git 

[1/3] incubator-beam git commit: [BEAM-283] finalize CheckpointMarks upon completed checkpoint

2016-09-28 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master db47c63ab -> 3879db036


[BEAM-283] finalize CheckpointMarks upon completed checkpoint


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cf14e809
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cf14e809
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cf14e809

Branch: refs/heads/master
Commit: cf14e809d4a790c407ab7c3cf1c90bb436a86dc9
Parents: c403675
Author: Maximilian Michels 
Authored: Fri Sep 16 17:04:22 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Sep 16 20:48:52 2016 +0200

--
 .../streaming/io/UnboundedSourceWrapper.java| 57 ++--
 .../streaming/UnboundedSourceWrapperTest.java   |  7 +++
 2 files changed, 61 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cf14e809/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 7fdc816..64cf703 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -22,6 +22,8 @@ import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
@@ -38,6 +40,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
@@ -54,7 +57,7 @@ import org.slf4j.LoggerFactory;
 public class UnboundedSourceWrapper<
 OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
 extends RichParallelSourceFunction
-implements Triggerable, StoppableFunction, Checkpointed {
+implements Triggerable, StoppableFunction, Checkpointed, 
CheckpointListener {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedSourceWrapper.class);
 
@@ -106,6 +109,15 @@ public class UnboundedSourceWrapper<
   private transient 
StreamSource.ManualWatermarkContext context;
 
   /**
+   * Pending checkpoints which have not been acknowledged yet.
+   */
+  private transient LinkedHashMap 
pendingCheckpoints;
+  /**
+   * Keep a maximum of 32 checkpoints for {@code 
CheckpointMark.finalizeCheckpoint()}.
+   */
+  private static final int MAX_NUMBER_PENDING_CHECKPOINTS = 32;
+
+  /**
* When restoring from a snapshot we put the restored sources/checkpoint 
marks here
* and open in {@link #open(Configuration)}.
*/
@@ -159,6 +171,8 @@ public class UnboundedSourceWrapper<
 localSplitSources = new ArrayList<>();
 localReaders = new ArrayList<>();
 
+pendingCheckpoints = new LinkedHashMap<>();
+
 if (restoredState != null) {
 
   // restore the splitSources from the checkpoint to ensure consistent 
ordering
@@ -324,7 +338,7 @@ public class UnboundedSourceWrapper<
   }
 
   @Override
-  public byte[] snapshotState(long l, long l1) throws Exception {
+  public byte[] snapshotState(long checkpointId, long checkpointTimestamp) 
throws Exception {
 
 if (checkpointCoder == null) {
   // no checkpoint coder available in this source
@@ -335,7 +349,8 @@ public class UnboundedSourceWrapper<
 // than we have a correct mapping of checkpoints to sources when
 // restoring
 List> checkpoints =
-new ArrayList<>();
+new ArrayList<>(localSplitSources.size());
+List checkpointMarks = new 
ArrayList<>(localSplitSources.size());
 
 for (int i = 0; i < localSplitSources.size(); i++) {
   UnboundedSource source = 
localSplitSources.get(i);
@@ 

[1/2] incubator-beam git commit: [BEAM-642] Support Flink Detached Mode for JOB execution

2016-09-22 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master f62d04e22 -> 843275210


[BEAM-642] Support Flink Detached Mode for JOB execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc69bc48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc69bc48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc69bc48

Branch: refs/heads/master
Commit: dc69bc48b0057f45d849d3cfec848fa066ee0854
Parents: f62d04e
Author: Sumit Chawla 
Authored: Mon Sep 19 15:10:53 2016 -0700
Committer: Maximilian Michels 
Committed: Thu Sep 22 11:30:09 2016 +0200

--
 .../apache/beam/runners/flink/FlinkRunner.java  | 25 +---
 1 file changed, 16 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc69bc48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index d3c65c0..137fdeb 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -25,6 +25,7 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -56,6 +57,7 @@ import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.DetachedEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -151,18 +153,23 @@ public class FlinkRunner extends 
PipelineRunner {
   throw new RuntimeException("Pipeline execution failed", e);
 }
 
-LOG.info("Execution finished in {} msecs", result.getNetRuntime());
-
-Map accumulators = result.getAllAccumulatorResults();
-if (accumulators != null && !accumulators.isEmpty()) {
-  LOG.info("Final aggregator values:");
+if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) {
+  LOG.info("Pipeline submitted in Detached mode");
+  Map accumulators = Collections.emptyMap();
+  return new FlinkRunnerResult(accumulators, -1L);
+} else {
+  LOG.info("Execution finished in {} msecs", result.getNetRuntime());
+  Map accumulators = result.getAllAccumulatorResults();
+  if (accumulators != null && !accumulators.isEmpty()) {
+LOG.info("Final aggregator values:");
 
-  for (Map.Entry entry : 
result.getAllAccumulatorResults().entrySet()) {
-LOG.info("{} : {}", entry.getKey(), entry.getValue());
+for (Map.Entry entry : 
result.getAllAccumulatorResults().entrySet()) {
+  LOG.info("{} : {}", entry.getKey(), entry.getValue());
+}
   }
-}
 
-return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+  return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+}
   }
 
   /**



[GitHub] incubator-beam pull request #967: [BEAM-283] finalize CheckpointMarks upon c...

2016-09-16 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/967

[BEAM-283] finalize CheckpointMarks upon completed checkpoint

See the first commit for an addition to the `UnboundedSourceWrapper` to 
call `finalizeCheckpoint()` on `CheckpointMark`s upon checkpoint completion.

The second commits contains a potential race condition upon startup of the 
source wrapper when checkpointing is enabled.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-283

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/967.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #967


commit cf14e809d4a790c407ab7c3cf1c90bb436a86dc9
Author: Maximilian Michels <m...@apache.org>
Date:   2016-09-16T15:04:22Z

[BEAM-283] finalize CheckpointMarks upon completed checkpoint

commit b82208b22adab641f3c469bef622d0a76be88d68
Author: Maximilian Michels <m...@apache.org>
Date:   2016-09-16T16:42:43Z

fix potential NPE in checkpointing of UnboundedSourceWrapper

This moves all the initialization code to the open() method which ensures
that no snapshot can occur before the state has been initialized correctly.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #929

2016-09-09 Thread mxm
This closes #929


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e9326c8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e9326c8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e9326c8b

Branch: refs/heads/master
Commit: e9326c8b19c74a070b8ce8612af25b79dfb537ab
Parents: b6205ff de6ec82
Author: Maximilian Michels 
Authored: Fri Sep 9 16:17:03 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Sep 9 16:17:03 2016 +0200

--
 .../wrappers/streaming/WindowDoFnOperator.java  | 12 +++-
 1 file changed, 11 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9326c8b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
--



[1/2] incubator-beam git commit: [flink] initialize watermarkTimeQueue with Comparator

2016-09-09 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master b6205ffa3 -> e9326c8b1


[flink] initialize watermarkTimeQueue with Comparator


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de6ec823
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de6ec823
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de6ec823

Branch: refs/heads/master
Commit: de6ec8238f16f7505eb17ffa293208dabfa3431a
Parents: 26635d7
Author: Maximilian Michels 
Authored: Wed Sep 7 16:49:38 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Sep 7 16:51:24 2016 +0200

--
 .../wrappers/streaming/WindowDoFnOperator.java  | 12 +++-
 1 file changed, 11 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de6ec823/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 29ae6ae..075f5df 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -265,7 +265,17 @@ public class WindowDoFnOperator
 int numWatermarkTimers = dataIn.readInt();
 
 watermarkTimers = new HashSet<>(numWatermarkTimers);
-watermarkTimersQueue = new PriorityQueue<>(Math.max(numWatermarkTimers, 
1));
+
+watermarkTimersQueue = new PriorityQueue<>(
+Math.max(numWatermarkTimers, 1),
+new Comparator>() {
+  @Override
+  public int compare(
+  Tuple2 o1,
+  Tuple2 o2) {
+return o1.f1.compareTo(o2.f1);
+  }
+});
 
 for (int i = 0; i < numWatermarkTimers; i++) {
   int length = dataIn.readInt();



[2/5] incubator-beam git commit: [BEAM-333][flink] make bounded/unbounded sources stoppable

2016-09-09 Thread mxm
[BEAM-333][flink] make bounded/unbounded sources stoppable


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e2820b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e2820b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e2820b0

Branch: refs/heads/master
Commit: 7e2820b06c19d958cbf7316ae28def7fe796a360
Parents: be689df
Author: Maximilian Michels 
Authored: Tue Sep 6 16:38:43 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Sep 9 16:06:42 2016 +0200

--
 .../wrappers/streaming/io/BoundedSourceWrapper.java | 9 -
 .../wrappers/streaming/io/UnboundedSourceWrapper.java   | 8 +++-
 2 files changed, 15 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e2820b0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 3cb93c0..df49a49 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -37,7 +38,8 @@ import org.slf4j.LoggerFactory;
  * Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink 
Source.
  */
 public class BoundedSourceWrapper
-extends RichParallelSourceFunction {
+extends RichParallelSourceFunction
+implements StoppableFunction {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BoundedSourceWrapper.class);
 
@@ -206,6 +208,11 @@ public class BoundedSourceWrapper
 isRunning = false;
   }
 
+  @Override
+  public void stop() {
+this.isRunning = false;
+  }
+
   /**
* Visible so that we can check this in tests. Must not be used for anything 
else.
*/

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e2820b0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 8647322..debf52f 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory;
 public class UnboundedSourceWrapper<
 OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
 extends RichParallelSourceFunction
-implements Triggerable, Checkpointed {
+implements Triggerable, StoppableFunction, Checkpointed {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedSourceWrapper.class);
 
@@ -311,6 +312,11 @@ public class UnboundedSourceWrapper<
   }
 
   @Override
+  public void stop() {
+isRunning = false;
+  }
+
+  @Override
   public byte[] snapshotState(long l, long l1) throws Exception {
 
 if (checkpointCoder == null) {



[1/5] incubator-beam git commit: [BEAM-619] keep track of local split sources in UnboundedSourceWrapper

2016-09-09 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master a96ea98a4 -> b6205ffa3


[BEAM-619] keep track of local split sources in UnboundedSourceWrapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/145ad47d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/145ad47d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/145ad47d

Branch: refs/heads/master
Commit: 145ad47d9f945f816be7a91001cdf7cb3b6a7fac
Parents: be689df
Author: Maximilian Michels 
Authored: Wed Sep 7 13:07:15 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Sep 7 13:15:54 2016 +0200

--
 .../streaming/io/UnboundedSourceWrapper.java| 79 +++-
 1 file changed, 43 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/145ad47d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 8647322..2cd06ed 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -58,7 +58,7 @@ public class UnboundedSourceWrapper<
   private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedSourceWrapper.class);
 
   /**
-   * Keep the options so that we can initialize the readers.
+   * Keep the options so that we can initialize the localReaders.
*/
   private final SerializedPipelineOptions serializedOptions;
 
@@ -72,13 +72,19 @@ public class UnboundedSourceWrapper<
* The split sources. We split them in the constructor to ensure that all 
parallel
* sources are consistent about the split sources.
*/
-  private List> 
splitSources;
+  private final List> 
splitSources;
 
   /**
+   * The local split sources. Assigned at runtime when the wrapper is executed 
in parallel.
+   */
+  private transient List> 
localSplitSources;
+
+  /**
+   * The local split readers. Assigned at runtime when the wrapper is executed 
in parallel.
* Make it a field so that we can access it in {@link #trigger(long)} for
* emitting watermarks.
*/
-  private transient List readers;
+  private transient List 
localReaders;
 
   /**
* Initialize here and not in run() to prevent races where we cancel a job 
before run() is
@@ -149,26 +155,15 @@ public class UnboundedSourceWrapper<
 int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
 int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
 
-List> localSources = new 
ArrayList<>();
-
-for (int i = 0; i < splitSources.size(); i++) {
-  if (i % numSubtasks == subtaskIndex) {
-localSources.add(splitSources.get(i));
-  }
-}
+localSplitSources = new ArrayList<>();
+localReaders = new ArrayList<>();
 
-LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}",
-subtaskIndex,
-numSubtasks,
-localSources);
-
-readers = new ArrayList<>();
 if (restoredState != null) {
 
   // restore the splitSources from the checkpoint to ensure consistent 
ordering
   // do it using a transform because otherwise we would have to do
   // unchecked casts
-  splitSources = Lists.transform(
+  localSplitSources = Lists.transform(
   restoredState,
   new Function<
   KV, 
CheckpointMarkT>,
@@ -182,19 +177,31 @@ public class UnboundedSourceWrapper<
 
   for (KV, 
CheckpointMarkT> restored:
   restoredState) {
-readers.add(
+localReaders.add(
 restored.getKey().createReader(
 serializedOptions.getPipelineOptions(), restored.getValue()));
   }
   restoredState = null;
 } else {
-  // initialize readers from scratch
-  for (UnboundedSource source : localSources) {
-
readers.add(source.createReader(serializedOptions.getPipelineOptions(), null));
+  // initialize localReaders and localSources from scratch
+  for (int i = 0; i < splitSources.size(); i++) {
+if (i % numSubtasks == subtaskIndex) {
+  

[5/5] incubator-beam git commit: This closes #927

2016-09-09 Thread mxm
This closes #927


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b6205ffa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b6205ffa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b6205ffa

Branch: refs/heads/master
Commit: b6205ffa309af4e21ea2f63a211caae4961b81b1
Parents: c78db9a 4afd25a
Author: Maximilian Michels 
Authored: Fri Sep 9 16:10:55 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Sep 9 16:10:55 2016 +0200

--
 .../streaming/io/UnboundedSourceWrapper.java|  87 --
 .../streaming/UnboundedSourceWrapperTest.java   | 113 +++
 2 files changed, 93 insertions(+), 107 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6205ffa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
--



[4/5] incubator-beam git commit: [BEAM-619] extend test case to be parameterized

2016-09-09 Thread mxm
[BEAM-619] extend test case to be parameterized

- extend test case with number of tasks and splits parameters


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4afd25a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4afd25a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4afd25a7

Branch: refs/heads/master
Commit: 4afd25a7a85a24ff0212a4791661d3c5e105662b
Parents: 145ad47
Author: Maximilian Michels 
Authored: Wed Sep 7 14:23:12 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Sep 9 16:09:44 2016 +0200

--
 .../streaming/io/UnboundedSourceWrapper.java|   8 ++
 .../streaming/UnboundedSourceWrapperTest.java   | 113 +++
 2 files changed, 50 insertions(+), 71 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4afd25a7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 2cd06ed..a62a754 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -397,4 +397,12 @@ public class UnboundedSourceWrapper<
   public List> 
getSplitSources() {
 return splitSources;
   }
+
+  /**
+   * Visible so that we can check this in tests. Must not be used for anything 
else.
+   */
+  @VisibleForTesting
+  public List> 
getLocalSplitSources() {
+return localSplitSources;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4afd25a7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
--
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 73124a9..0cc584e 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -44,78 +46,43 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Tests for {@link UnboundedSourceWrapper}.
  */
+@RunWith(Parameterized.class)
 public class UnboundedSourceWrapperTest {
 
-  /**
-   * Creates a {@link UnboundedSourceWrapper} that has exactly one reader per 
source, since we
-   * specify a parallelism of 1 and also at runtime tell the source that it 
has 1 parallel subtask.
-   */
-  @Test
-  public void testWithOneReader() throws Exception {
-final int numElements = 20;
-final Object checkpointLock = new Object();
-PipelineOptions options = PipelineOptionsFactory.create();
-
-// this source will emit exactly NUM_ELEMENTS across all parallel readers,
-// afterwards it will stall. We check whether we also receive NUM_ELEMENTS
-// elements later.
-TestCountingSource source = new TestCountingSource(numElements);
-UnboundedSourceWrapper, 
TestCountingSource.CounterMark> flinkWrapper =
-new UnboundedSourceWrapper<>(options, source, 1);
-
-assertEquals(1, flinkWrapper.getSplitSources().size());
-
-StreamSource<
-WindowedValue>,
-UnboundedSourceWrapper<
-KV,
-TestCountingSource.CounterMark>> sourceOperator = new 
StreamSource<>(flinkWrapper);
-
-setupSourceOperator(sourceOperator);
-
-
-try {
-  sourceOperator.run(checkpointLock,
-  new Output

[3/5] incubator-beam git commit: This closes #924

2016-09-09 Thread mxm
This closes #924


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c78db9ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c78db9ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c78db9ad

Branch: refs/heads/master
Commit: c78db9addf0b08b1b4a3ca4ec5e3e7f3a0899a02
Parents: a96ea98 7e2820b
Author: Maximilian Michels 
Authored: Fri Sep 9 16:07:57 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Sep 9 16:07:57 2016 +0200

--
 .../wrappers/streaming/io/BoundedSourceWrapper.java | 9 -
 .../wrappers/streaming/io/UnboundedSourceWrapper.java   | 8 +++-
 2 files changed, 15 insertions(+), 2 deletions(-)
--




[2/4] incubator-beam git commit: [BEAM-617][flink] introduce option to set state backend

2016-09-09 Thread mxm
[BEAM-617][flink] introduce option to set state backend


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d4f85912
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d4f85912
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d4f85912

Branch: refs/heads/master
Commit: d4f85912effd2c04cac99d693a87bf6e2d597e9c
Parents: be689df
Author: Maximilian Michels 
Authored: Tue Sep 6 16:25:32 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Sep 9 15:59:48 2016 +0200

--
 .../runners/flink/FlinkPipelineExecutionEnvironment.java |  7 +++
 .../apache/beam/runners/flink/FlinkPipelineOptions.java  | 11 +++
 2 files changed, 18 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4f85912/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index a5d33b4..391c3f2 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.CollectionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.slf4j.Logger;
@@ -221,6 +222,12 @@ public class FlinkPipelineExecutionEnvironment {
   flinkStreamEnv.enableCheckpointing(checkpointInterval);
 }
 
+// State backend
+final AbstractStateBackend stateBackend = options.getStateBackend();
+if (stateBackend != null) {
+  flinkStreamEnv.setStateBackend(stateBackend);
+}
+
 return flinkStreamEnv;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4f85912/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 1fb23ec..a067e76 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 
 /**
  * Options which can be used to configure a Flink PipelineRunner.
@@ -82,4 +83,14 @@ public interface FlinkPipelineOptions
   Long getExecutionRetryDelay();
   void setExecutionRetryDelay(Long delay);
 
+  /**
+   * Sets a state backend to store Beam's state during computation.
+   * Note: Only applicable when executing in streaming mode.
+   * @param stateBackend The state backend to use
+   */
+  @Description("Sets the state backend to use in streaming mode. "
+  + "Otherwise the default is read from the Flink config.")
+  void setStateBackend(AbstractStateBackend stateBackend);
+  AbstractStateBackend getStateBackend();
+
 }



[1/4] incubator-beam git commit: [flink] use exploded WindowValue in FlinkDoFnFunction

2016-09-09 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master 817515fe4 -> a96ea98a4


[flink] use exploded WindowValue in FlinkDoFnFunction


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3461ce21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3461ce21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3461ce21

Branch: refs/heads/master
Commit: 3461ce21b8b88de18154de777e21dc7af889f2c7
Parents: 26635d7
Author: Maximilian Michels 
Authored: Wed Sep 7 14:49:02 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Sep 7 14:49:02 2016 +0200

--
 .../runners/flink/translation/functions/FlinkDoFnFunction.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3461ce21/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index ac5b345..798a23c 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -86,7 +86,7 @@ public class FlinkDoFnFunction
   // is in only one window
   for (WindowedValue value : values) {
 for (WindowedValue explodedValue : value.explodeWindows()) {
-  context = context.forWindowedValue(value);
+  context = context.forWindowedValue(explodedValue);
   doFn.processElement(context);
 }
   }



[3/4] incubator-beam git commit: This closes #923

2016-09-09 Thread mxm
This closes #923


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0399dbc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0399dbc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0399dbc7

Branch: refs/heads/master
Commit: 0399dbc7a843e95ceacf9eff9fc751751f8f4bcc
Parents: 817515f d4f8591
Author: Maximilian Michels 
Authored: Fri Sep 9 16:00:33 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Sep 9 16:00:33 2016 +0200

--
 .../runners/flink/FlinkPipelineExecutionEnvironment.java |  7 +++
 .../apache/beam/runners/flink/FlinkPipelineOptions.java  | 11 +++
 2 files changed, 18 insertions(+)
--




[4/4] incubator-beam git commit: This closes #928

2016-09-09 Thread mxm
This closes #928


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a96ea98a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a96ea98a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a96ea98a

Branch: refs/heads/master
Commit: a96ea98a48c2fc7e95bdb6265ccf421355584c4d
Parents: 0399dbc 3461ce2
Author: Maximilian Michels 
Authored: Fri Sep 9 16:01:29 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Sep 9 16:01:29 2016 +0200

--
 .../runners/flink/translation/functions/FlinkDoFnFunction.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[1/3] incubator-beam-site git commit: [BEAM-102] update capability matrix

2016-09-08 Thread mxm
Repository: incubator-beam-site
Updated Branches:
  refs/heads/asf-site e2430eb4d -> dcdd8b742


[BEAM-102] update capability matrix


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/8459da13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/8459da13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/8459da13

Branch: refs/heads/asf-site
Commit: 8459da13fccbd16e850ea455873812eea974b6dc
Parents: e2430eb
Author: Maximilian Michels 
Authored: Mon Sep 5 13:04:49 2016 +0200
Committer: Maximilian Michels 
Committed: Thu Sep 8 17:24:27 2016 +0200

--
 _data/capability-matrix.yml | 9 -
 1 file changed, 4 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/8459da13/_data/capability-matrix.yml
--
diff --git a/_data/capability-matrix.yml b/_data/capability-matrix.yml
index e2f66b9..89da73b 100644
--- a/_data/capability-matrix.yml
+++ b/_data/capability-matrix.yml
@@ -118,12 +118,11 @@ categories:
   - class: dataflow
 l1: 'Yes'
 l2: some size restrictions in streaming
-l3: Batch implemented supports a distributed implementation, but 
streaming mode may force some size restrictions. Neither mode is able to push 
lookups directly up into key-based sources.
+l3: Batch mode supports a distributed implementation, but 
streaming mode may force some size restrictions. Neither mode is able to push 
lookups directly up into key-based sources.
   - class: flink
-jira: BEAM-102
-l1: 'Partially'
-l2: no supported in streaming
-l3: Supported in batch. Side inputs for streaming are currently 
WiP.
+l1: 'Yes'
+l2: some size restrictions in streaming
+l3: Batch mode supports a distributed implementation, but 
streaming mode may force some size restrictions. Neither mode is able to push 
lookups directly up into key-based sources.
   - class: spark
 l1: 'Partially'
 l2: not supported in streaming



[3/3] incubator-beam-site git commit: This closes #39

2016-09-08 Thread mxm
This closes #39


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/dcdd8b74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/dcdd8b74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/dcdd8b74

Branch: refs/heads/asf-site
Commit: dcdd8b742bd5f1463815638ef1c33aca3a523308
Parents: e2430eb bb1106b
Author: Maximilian Michels 
Authored: Thu Sep 8 17:24:51 2016 +0200
Committer: Maximilian Michels 
Committed: Thu Sep 8 17:24:51 2016 +0200

--
 _data/capability-matrix.yml| 9 -
 content/learn/runners/capability-matrix/index.html | 8 
 2 files changed, 8 insertions(+), 9 deletions(-)
--




[GitHub] incubator-beam pull request #929: [flink] initialize watermarkTimeQueue with...

2016-09-07 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/929

[flink] initialize watermarkTimeQueue with Comparator

This mitigates a ClassCastException with Comparable.

CC @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam fix2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/929.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #929






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #928: [flink] use exploded WindowValue in FlinkD...

2016-09-07 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/928

[flink] use exploded WindowValue in FlinkDoFnFunction

CC @aljoscha 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/928.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #928






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #924: [BEAM-333][flink] make unbounded sources s...

2016-09-06 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/924

[BEAM-333][flink] make unbounded sources stoppable




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-333

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #924


commit 557f1fe909047a72492faa67c6f7d2d24c3cf729
Author: Maximilian Michels <m...@apache.org>
Date:   2016-09-06T14:38:43Z

[BEAM-333][flink] make unbounded sources stoppable




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #923: [BEAM-617][flink] introduce option to set ...

2016-09-06 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/923

[BEAM-617][flink] introduce option to set state backend

CC @aljoscha 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-617

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/923.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #923


commit 2eba5034e7d5abbae737025a125805af7744aefd
Author: Maximilian Michels <m...@apache.org>
Date:   2016-09-06T14:25:32Z

[BEAM-617][flink] introduce option to set state backend




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam-site pull request #39: [BEAM-102] update capability matrix

2016-09-05 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam-site/pull/39

[BEAM-102] update capability matrix

This updates the matrix to the most recent development status.

CC @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam-site asf-site

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam-site/pull/39.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #39


commit a36d8c54c32445485cc01d69d9e75cc76a227ebe
Author: Maximilian Michels <m...@apache.org>
Date:   2016-09-05T11:04:49Z

[BEAM-102] update capability matrix

commit 3afa3793f134d94e5d19bea650b62a0b616d5933
Author: Maximilian Michels <m...@apache.org>
Date:   2016-09-05T11:05:49Z

build website




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #821: [flink] add missing maven config to exampl...

2016-08-12 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/821

[flink] add missing maven config to example pom



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/821.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #821


commit e21b1c594f4307b2bc5e615d40e1d67f209c527b
Author: Maximilian Michels <m...@apache.org>
Date:   2016-08-12T15:51:02Z

[flink] add missing maven config to example pom




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: [flink] improve example section in README

2016-07-26 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master 267136fb6 -> d02d2de09


[flink] improve example section in README

- updates the README
- repairs broken exec configuration


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2fe38770
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2fe38770
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2fe38770

Branch: refs/heads/master
Commit: 2fe387707d1e115b578f5ee643bb99c0e4667ee0
Parents: cf14644
Author: Maximilian Michels 
Authored: Wed Jul 20 16:06:06 2016 +0200
Committer: Maximilian Michels 
Committed: Mon Jul 25 17:30:19 2016 +0200

--
 runners/flink/README.md | 25 
 runners/flink/examples/pom.xml  | 11 -
 .../beam/runners/flink/examples/WordCount.java  |  4 ++--
 3 files changed, 21 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe38770/runners/flink/README.md
--
diff --git a/runners/flink/README.md b/runners/flink/README.md
index 3348119..aeb1692 100644
--- a/runners/flink/README.md
+++ b/runners/flink/README.md
@@ -109,35 +109,40 @@ Next, let's run the classic WordCount example. It's 
semantically identically to
 the example provided with Apache Beam. Only this time, we chose the
 `FlinkRunner` to execute the WordCount on top of Flink.
 
-Here's an excerpt from the WordCount class file:
+Here's an excerpt from the [WordCount class 
file](examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java):
 
 ```java
-Options options = PipelineOptionsFactory.fromArgs(args).as(Options.class);
+Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
 // yes, we want to run WordCount with Flink
 options.setRunner(FlinkRunner.class);
 
 Pipeline p = Pipeline.create(options);
 
-p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
-   .apply(new CountWords())
-   .apply(TextIO.Write.named("WriteCounts")
-   .to(options.getOutput())
-   .withNumShards(options.getNumShards()));
+p.apply("ReadLines", TextIO.Read.from(options.getInput()))
+.apply(new CountWords())
+.apply(MapElements.via(new FormatAsTextFn()))
+.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 
 p.run();
 ```
 
 To execute the example, let's first get some sample data:
 
-curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > 
examples/kinglear.txt
+cd runners/flink/examples
+curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > kinglear.txt
 
 Then let's run the included WordCount locally on your machine:
 
-cd examples
-mvn exec:exec -Dinput=kinglear.txt -Doutput=wordcounts.txt
+cd runners/flink/examples
+mvn exec:java 
-Dexec.mainClass=org.apache.beam.runners.flink.examples.WordCount \
+  -Dinput=kinglear.txt -Doutput=wordcounts.txt
 
 Congratulations, you have run your first Apache Beam program on top of Apache 
Flink!
 
+Note, that you will find a number of `wordcounts*` output files because Flink 
parallelizes the
+WordCount computation. You may pass an additional `-Dparallelism=1` to disable 
parallelization and
+get a single `wordcounts.txt` file.
 
 # Running Beam programs on a Flink cluster
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe38770/runners/flink/examples/pom.xml
--
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
index b0ee2ed..355a6be 100644
--- a/runners/flink/examples/pom.xml
+++ b/runners/flink/examples/pom.xml
@@ -33,11 +33,10 @@
   jar
 
   
-
-org.apache.beam.runners.flink.examples.WordCount
+
 kinglear.txt
 wordcounts.txt
-1
+-1
   
 
   
@@ -131,12 +130,10 @@
 
   java
   
--classpath
-
-${clazz}
+
--runner=org.apache.beam.runners.flink.FlinkRunner
+--parallelism=${parallelism}
 --input=${input}
 --output=${output}
---parallelism=${parallelism}
   
 
   

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe38770/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
--
diff --git 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index 

[2/2] incubator-beam git commit: This closes #724

2016-07-26 Thread mxm
This closes #724


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d02d2de0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d02d2de0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d02d2de0

Branch: refs/heads/master
Commit: d02d2de094dda4c8fcc30e1964b4e2c514e2f557
Parents: 267136f 2fe3877
Author: Maximilian Michels 
Authored: Tue Jul 26 12:03:29 2016 +0200
Committer: Maximilian Michels 
Committed: Tue Jul 26 12:04:34 2016 +0200

--
 runners/flink/README.md | 25 
 runners/flink/examples/pom.xml  | 11 -
 .../beam/runners/flink/examples/WordCount.java  |  4 ++--
 3 files changed, 21 insertions(+), 19 deletions(-)
--




[GitHub] incubator-beam pull request #724: [flink] improve example section in README

2016-07-25 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/724

[flink] improve example section in README

- updates the README
- repairs broken exec configuration

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam README

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/724.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #724


commit 2f07d4e6d5b801860be804a808ec5c87fe067c5a
Author: Maximilian Michels <m...@apache.org>
Date:   2016-07-20T14:06:06Z

[flink] improve example section in README

- updates the README
- repairs broken exec configuration




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #450

2016-06-13 Thread mxm
This closes #450


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/be05942d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/be05942d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/be05942d

Branch: refs/heads/master
Commit: be05942da0f09a247e195d1d29513ae40e1a95e0
Parents: 60964b6 a2abc6a
Author: Maximilian Michels 
Authored: Mon Jun 13 14:57:31 2016 +0200
Committer: Maximilian Michels 
Committed: Mon Jun 13 14:57:31 2016 +0200

--
 .../wrappers/streaming/FlinkAbstractParDoWrapper.java   | 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)
--




[1/2] incubator-beam git commit: [flink] fix potential NPE in ParDoWrapper

2016-06-13 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master 60964b611 -> be05942da


[flink] fix potential NPE in ParDoWrapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a2abc6a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a2abc6a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a2abc6a2

Branch: refs/heads/master
Commit: a2abc6a249cdc4e6000d1539df6c3b5cde8d39b0
Parents: 60964b6
Author: Maximilian Michels 
Authored: Fri Jun 10 14:26:45 2016 +0200
Committer: Maximilian Michels 
Committed: Mon Jun 13 14:56:54 2016 +0200

--
 .../wrappers/streaming/FlinkAbstractParDoWrapper.java   | 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a2abc6a2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index a935011..3c37aa9 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -70,18 +70,21 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl
 
   @Override
   public void open(Configuration parameters) throws Exception {
-this.doFn.startBundle(context);
   }
 
   @Override
   public void close() throws Exception {
-this.doFn.finishBundle(context);
+if (this.context != null) {
+  // we have initialized the context
+  this.doFn.finishBundle(this.context);
+}
   }
 
   @Override
   public void flatMap(WindowedValue value, Collector 
out) throws Exception {
 if (this.context == null) {
   this.context = new DoFnProcessContext(doFn, out);
+  this.doFn.startBundle(this.context);
 }
 
 // for each window the element belongs to, create a new copy here.
@@ -98,7 +101,7 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl
 
   private void processElement(WindowedValue value) throws Exception {
 this.context.setElement(value);
-doFn.processElement(context);
+doFn.processElement(this.context);
   }
 
   private class DoFnProcessContext extends DoFn.ProcessContext {



[GitHub] incubator-beam pull request #450: [flink] fix potential NPE in ParDoWrapper

2016-06-13 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/450

[flink] fix potential NPE in ParDoWrapper

Just discovered this while checking for correct execution of the bundle 
life cycle. This fixes potential NPEs in the ParDo translation wrapper. 
`startBundle(context)` receives `null` as `context` before the first element 
has been read. Similarly, `finishBundle(context)` receives `null` as `context` 
if no elements have been read.

CC @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam pr

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/450.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #450


commit ef9f4c0bb4aecde5a99cd80e96223de306a05455
Author: Maximilian Michels <m...@apache.org>
Date:   2016-06-13T12:26:45Z

[flink] fix potential NPE in ParDoWrapper




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/5] incubator-beam git commit: [BEAM-196] provide PipelineOptions in DoFn

2016-06-08 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master ffbfc66e1 -> cc448e976


[BEAM-196] provide PipelineOptions in DoFn

- fixes NPE when accessing the PipelineOptions
- adds a test to verify that the PipelineOptions are available


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eced106e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eced106e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eced106e

Branch: refs/heads/master
Commit: eced106e50ddb257524a7826ab7d27254be89da8
Parents: d10ae23
Author: Maximilian Michels 
Authored: Tue Jun 7 13:57:33 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Jun 8 15:19:50 2016 +0200

--
 .../streaming/FlinkAbstractParDoWrapper.java| 11 ++-
 .../beam/runners/flink/PipelineOptionsTest.java | 97 +++-
 2 files changed, 100 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eced106e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index 117303c..a935011 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
+import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -37,6 +38,7 @@ import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 import org.joda.time.format.PeriodFormat;
@@ -52,7 +54,7 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl
 
   private final DoFn doFn;
   private final WindowingStrategy windowingStrategy;
-  private transient PipelineOptions options;
+  private final SerializedPipelineOptions serializedPipelineOptions;
 
   private DoFnProcessContext context;
 
@@ -62,7 +64,7 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl
 Preconditions.checkNotNull(doFn);
 
 this.doFn = doFn;
-this.options = options;
+this.serializedPipelineOptions = new SerializedPipelineOptions(options);
 this.windowingStrategy = windowingStrategy;
   }
 
@@ -107,7 +109,8 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl
 
 private WindowedValue element;
 
-private DoFnProcessContext(DoFn function, 
Collector outCollector) {
+private DoFnProcessContext(DoFn function,
+  Collector outCollector) {
   function.super();
   super.setupDelegateAggregators();
 
@@ -156,7 +159,7 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl
 
 @Override
 public PipelineOptions getPipelineOptions() {
-  return options;
+  return serializedPipelineOptions.getPipelineOptions();
 }
 
 @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eced106e/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
--
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 464c6df..d571f31 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -18,14 +18,29 @@
 package org.apache.beam.runners.flink;
 
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import 

[4/5] incubator-beam git commit: [BEAM-287] adjust README to changed Maven layout

2016-06-08 Thread mxm
[BEAM-287] adjust README to changed Maven layout


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1fd0dfc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1fd0dfc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1fd0dfc7

Branch: refs/heads/master
Commit: 1fd0dfc7ad61078259a5ad43b37d42b873d40090
Parents: ffbfc66
Author: Maximilian Michels 
Authored: Fri Jun 3 11:13:45 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Jun 8 18:08:34 2016 +0200

--
 runners/flink/README.md | 7 +--
 1 file changed, 1 insertion(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fd0dfc7/runners/flink/README.md
--
diff --git a/runners/flink/README.md b/runners/flink/README.md
index cce17a2..69e2abb 100644
--- a/runners/flink/README.md
+++ b/runners/flink/README.md
@@ -163,7 +163,7 @@ The contents of the root `pom.xml` should be slightly 
changed aftewards (explana
   
 
   org.apache.beam
-  flink-runner_2.10
+  beam-runners-flink_2.10
   0.2.0-incubating-SNAPSHOT
 
   
@@ -186,11 +186,6 @@ The contents of the root `pom.xml` should be slightly 
changed aftewards (explana
   
org.apache.beam.runners.flink.examples.WordCount
 
   
-  
-
-  org.apache.flink:*
-
-  
 
   
 



[2/5] incubator-beam git commit: [flink] improve lifecycle of ParDoBoundWrapper

2016-06-08 Thread mxm
[flink] improve lifecycle of ParDoBoundWrapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d10ae23c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d10ae23c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d10ae23c

Branch: refs/heads/master
Commit: d10ae23c9bc9529d04d02951bfed01bbf2957773
Parents: ffbfc66
Author: Maximilian Michels 
Authored: Mon Jun 6 12:40:50 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Jun 8 15:19:50 2016 +0200

--
 .../streaming/FlinkAbstractParDoWrapper.java  | 18 +++---
 1 file changed, 11 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d10ae23c/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index bb6ed67..117303c 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -66,15 +66,21 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl
 this.windowingStrategy = windowingStrategy;
   }
 
-  private void initContext(DoFn function, 
Collector outCollector) {
-if (this.context == null) {
-  this.context = new DoFnProcessContext(function, outCollector);
-}
+  @Override
+  public void open(Configuration parameters) throws Exception {
+this.doFn.startBundle(context);
+  }
+
+  @Override
+  public void close() throws Exception {
+this.doFn.finishBundle(context);
   }
 
   @Override
   public void flatMap(WindowedValue value, Collector 
out) throws Exception {
-this.initContext(doFn, out);
+if (this.context == null) {
+  this.context = new DoFnProcessContext(doFn, out);
+}
 
 // for each window the element belongs to, create a new copy here.
 Collection windows = value.getWindows();
@@ -90,9 +96,7 @@ public abstract class FlinkAbstractParDoWrapper extends RichFl
 
   private void processElement(WindowedValue value) throws Exception {
 this.context.setElement(value);
-this.doFn.startBundle(context);
 doFn.processElement(context);
-this.doFn.finishBundle(context);
   }
 
   private class DoFnProcessContext extends DoFn.ProcessContext {



[5/5] incubator-beam git commit: This closes #415

2016-06-08 Thread mxm
This closes #415


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc448e97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc448e97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc448e97

Branch: refs/heads/master
Commit: cc448e976d46a8a9341445e152b3b31ea8968a56
Parents: f5583cf 1fd0dfc
Author: Maximilian Michels 
Authored: Wed Jun 8 18:09:03 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Jun 8 18:09:03 2016 +0200

--
 runners/flink/README.md | 7 +--
 1 file changed, 1 insertion(+), 6 deletions(-)
--




[3/5] incubator-beam git commit: This closes #432

2016-06-08 Thread mxm
This closes #432


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5583cfa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5583cfa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5583cfa

Branch: refs/heads/master
Commit: f5583cfad6c857d919995edd385adaf0f41fd676
Parents: ffbfc66 eced106
Author: Maximilian Michels 
Authored: Wed Jun 8 18:04:50 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Jun 8 18:04:50 2016 +0200

--
 .../streaming/FlinkAbstractParDoWrapper.java| 29 +++---
 .../beam/runners/flink/PipelineOptionsTest.java | 97 +++-
 2 files changed, 111 insertions(+), 15 deletions(-)
--




[GitHub] incubator-beam pull request #432: [BEAM-196] Additional fix to ensure the Pi...

2016-06-08 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/432

[BEAM-196] Additional fix to ensure the PipelineOptions are available in 
DoFns.

Fixes a NullPointException if `PipelineOptions` are acceessed inside a DoFn 
and backs it up with a test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-196

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/432.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #432


commit 05287c4ddcc38378e4e8cd921b6f6394e43eed75
Author: Maximilian Michels <m...@apache.org>
Date:   2016-06-07T11:57:33Z

[BEAM-196] fix NPE in ParDoWrapper

commit 595146d5759a6f631f057050cdbe71f9849035f9
Author: Maximilian Michels <m...@apache.org>
Date:   2016-06-08T10:40:50Z

[flink] improve lifecycle of ParDoBoundWrapper




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #398

2016-05-31 Thread mxm
This closes #398


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ffecfda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ffecfda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ffecfda

Branch: refs/heads/master
Commit: 2ffecfda2313a609f6db70b942d6d9a8984f464a
Parents: 1cd64bb 9706438
Author: Maximilian Michels 
Authored: Tue May 31 11:13:50 2016 +0200
Committer: Maximilian Michels 
Committed: Tue May 31 11:13:50 2016 +0200

--
 .../translation/FlinkBatchPipelineTranslator.java   | 16 +---
 1 file changed, 1 insertion(+), 15 deletions(-)
--




[2/4] incubator-beam git commit: [BEAM-235] use streaming mode on unbounded sources

2016-05-30 Thread mxm
[BEAM-235] use streaming mode on unbounded sources

This change automatically discovers the execution mode of the Pipeline
during a preliminary "optimization" translation of the pipeline. When
unbounded sources are discovered, the pipeline translation mode is
switched to streaming.

Users may still supply the streaming flag to override this
behavior. Users who forget to supply the flag, will automatically use
streaming mode whenever they use unbounded sources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5632bbf8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5632bbf8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5632bbf8

Branch: refs/heads/master
Commit: 5632bbf8a85a7d53fcaa53535030c4b406d8a09a
Parents: cca2577
Author: Maximilian Michels 
Authored: Mon May 30 09:59:12 2016 +0200
Committer: Maximilian Michels 
Committed: Mon May 30 12:11:24 2016 +0200

--
 runners/flink/README.md |   9 +-
 .../FlinkPipelineExecutionEnvironment.java  | 149 +++
 .../beam/runners/flink/FlinkPipelineRunner.java |  13 +-
 .../FlinkBatchPipelineTranslator.java   |   8 -
 .../translation/FlinkPipelineTranslator.java|  17 +++
 .../FlinkStreamingPipelineTranslator.java   |  10 --
 .../PipelineTranslationOptimizer.java   |  73 +
 .../flink/translation/TranslationMode.java  |  31 
 .../main/java/org/apache/beam/sdk/Pipeline.java |   2 +-
 9 files changed, 186 insertions(+), 126 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/runners/flink/README.md
--
diff --git a/runners/flink/README.md b/runners/flink/README.md
index 7418f16..457e2a6 100644
--- a/runners/flink/README.md
+++ b/runners/flink/README.md
@@ -27,11 +27,14 @@ and sinks or use the provided support for Apache Kafka.
 
 ### Seamless integration
 
-To execute a Beam program in streaming mode, just enable streaming in the 
`PipelineOptions`:
+The Flink Runner decides to use batch or streaming execution mode based on 
whether programs use
+unbounded sources. When unbounded sources are used, it executes in streaming 
mode, otherwise it
+uses the batch execution mode.
 
-options.setStreaming(true);
+If you wish to explicitly enable streaming mode, please set the streaming flag 
in the
+`PipelineOptions`:
 
-That's it. If you prefer batched execution, simply disable streaming mode.
+options.setStreaming(true);
 
 ## Batch
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index d31d790..4cd8fb3 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.flink;
 import org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator;
 import org.apache.beam.runners.flink.translation.FlinkPipelineTranslator;
 import 
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator;
+import org.apache.beam.runners.flink.translation.PipelineTranslationOptimizer;
+import org.apache.beam.runners.flink.translation.TranslationMode;
 import org.apache.beam.sdk.Pipeline;
 
 import com.google.common.base.Preconditions;
@@ -39,7 +41,7 @@ import java.util.List;
  * Depending on if the job is a Streaming or Batch processing one, it creates
  * the adequate execution environment ({@link ExecutionEnvironment} or {@link 
StreamExecutionEnvironment}),
  * the necessary {@link FlinkPipelineTranslator} ({@link 
FlinkBatchPipelineTranslator} or
- * {@link FlinkStreamingPipelineTranslator})to transform the Beam job into a 
Flink one, and
+ * {@link FlinkStreamingPipelineTranslator}) to transform the Beam job into a 
Flink one, and
  * executes the (translated) job.
  */
 public class FlinkPipelineExecutionEnvironment {
@@ -57,7 +59,6 @@ public class FlinkPipelineExecutionEnvironment {
*/
   private ExecutionEnvironment flinkBatchEnv;
 
-
   /**
* The Flink Streaming execution environment. This is instantiated to either 
a
* {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} 
or
@@ -67,51 +68,13 @@ public class 

[1/4] incubator-beam git commit: [Beam-312] don't checkpoint if CheckpointCoder not available

2016-05-30 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master cca2577c6 -> 36a27f538


[Beam-312] don't checkpoint if CheckpointCoder not available

This skips the checkpoint logic in the UnboundedSourceWrapper if the
UnboundedSource doesn't supply a CheckpointMarkCoder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c4072ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c4072ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c4072ad

Branch: refs/heads/master
Commit: 9c4072ad87f25248f77e437e5bcf674aff19982b
Parents: cca2577
Author: Maximilian Michels 
Authored: Mon May 30 15:59:12 2016 +0200
Committer: Maximilian Michels 
Committed: Sat May 28 16:17:15 2016 +0200

--
 .../streaming/io/UnboundedSourceWrapper.java| 24 +---
 1 file changed, 21 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c4072ad/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index b816e2a..7f26a65 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -120,10 +120,17 @@ public class UnboundedSourceWrapper<
 }
 
 Coder checkpointMarkCoder = 
source.getCheckpointMarkCoder();
-Coder> sourceCoder =
-SerializableCoder.of(new TypeDescriptor>() {});
+if (checkpointMarkCoder == null) {
+  LOG.info("No CheckpointMarkCoder specified for this source. Won't create 
snapshots.");
+  checkpointCoder = null;
+} else {
+
+  Coder> sourceCoder =
+  SerializableCoder.of(new TypeDescriptor>() {
+  });
 
-checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, 
checkpointMarkCoder));
+  checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, 
checkpointMarkCoder));
+}
 
 // get the splits early. we assume that the generated splits are stable,
 // this is necessary so that the mapping of state to source is correct
@@ -308,6 +315,12 @@ public class UnboundedSourceWrapper<
 
   @Override
   public byte[] snapshotState(long l, long l1) throws Exception {
+
+if (checkpointCoder == null) {
+  // no checkpoint coder available in this source
+  return null;
+}
+
 // we checkpoint the sources along with the CheckpointMarkT to ensure
 // than we have a correct mapping of checkpoints to sources when
 // restoring
@@ -333,6 +346,11 @@ public class UnboundedSourceWrapper<
 
   @Override
   public void restoreState(byte[] bytes) throws Exception {
+if (checkpointCoder == null) {
+  // no checkpoint coder available in this source
+  return;
+}
+
 try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
   restoredState = checkpointCoder.decode(bais, Coder.Context.OUTER);
 }



[3/4] incubator-beam git commit: This closes #395

2016-05-30 Thread mxm
This closes #395


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5d2f9cd2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5d2f9cd2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5d2f9cd2

Branch: refs/heads/master
Commit: 5d2f9cd29b1ea70b3e6bbd5c884260c3695262d3
Parents: cca2577 9c4072a
Author: Maximilian Michels 
Authored: Mon May 30 12:18:00 2016 +0200
Committer: Maximilian Michels 
Committed: Mon May 30 12:18:00 2016 +0200

--
 .../streaming/io/UnboundedSourceWrapper.java| 24 +---
 1 file changed, 21 insertions(+), 3 deletions(-)
--




[4/4] incubator-beam git commit: This closes #394

2016-05-30 Thread mxm
This closes #394


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/36a27f53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/36a27f53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/36a27f53

Branch: refs/heads/master
Commit: 36a27f53836475048db884aeaa70d496001f1f41
Parents: 5d2f9cd 5632bbf
Author: Maximilian Michels 
Authored: Mon May 30 12:18:18 2016 +0200
Committer: Maximilian Michels 
Committed: Mon May 30 12:18:18 2016 +0200

--
 runners/flink/README.md |   9 +-
 .../FlinkPipelineExecutionEnvironment.java  | 149 +++
 .../beam/runners/flink/FlinkPipelineRunner.java |  13 +-
 .../FlinkBatchPipelineTranslator.java   |   8 -
 .../translation/FlinkPipelineTranslator.java|  17 +++
 .../FlinkStreamingPipelineTranslator.java   |  10 --
 .../PipelineTranslationOptimizer.java   |  73 +
 .../flink/translation/TranslationMode.java  |  31 
 .../main/java/org/apache/beam/sdk/Pipeline.java |   2 +-
 9 files changed, 186 insertions(+), 126 deletions(-)
--




[GitHub] incubator-beam pull request: [Beam-312] don't checkpoint if Checkp...

2016-05-28 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/395

[Beam-312] don't checkpoint if CheckpointCoder not available

This skips the checkpoint logic in the UnboundedSourceWrapper if the
UnboundedSource doesn't supply a CheckpointMarkCoder.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-312

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/395.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #395


commit 9c4072ad87f25248f77e437e5bcf674aff19982b
Author: Maximilian Michels <m...@apache.org>
Date:   2016-05-30T13:59:12Z

[Beam-312] don't checkpoint if CheckpointCoder not available

This skips the checkpoint logic in the UnboundedSourceWrapper if the
UnboundedSource doesn't supply a CheckpointMarkCoder.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request: [BEAM-235] use streaming mode on unbo...

2016-05-28 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/394

[BEAM-235] use streaming mode on unbounded sources

This change automatically discovers the execution mode of the Pipeline
during a preliminary "optimization" translation of the pipeline. When
unbounded sources are discovered, the pipeline translation mode is
switched to streaming.

Users may still supply the streaming flag to override this
behavior. Users who forget to supply the flag, will automatically use
streaming mode whenever they use unbounded sources.

- TODO: update documentation

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-235

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/394.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #394


commit c9c1e732f06dd31b565fbf632634a78d38fa3add
Author: Maximilian Michels <m...@apache.org>
Date:   2016-05-30T07:59:12Z

[BEAM-235] use streaming mode on unbounded sources

This change automatically discovers the execution mode of the Pipeline
during a preliminary "optimization" translation of the pipeline. When
unbounded sources are discovered, the pipeline translation mode is
switched to streaming.

Users may still supply the streaming flag to override this
behavior. Users who forget to supply the flag, will automatically use
streaming mode whenever they use unbounded sources.

- TODO: update documentation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/3] incubator-beam-site git commit: [BEAM-103] rebuild capability matrix

2016-05-27 Thread mxm
[BEAM-103] rebuild capability matrix


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/15dd578f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/15dd578f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/15dd578f

Branch: refs/heads/asf-site
Commit: 15dd578f15885b51b3fbef1c9deab302814b8f07
Parents: e9937d9
Author: Maximilian Michels 
Authored: Fri May 27 18:39:31 2016 +0200
Committer: Maximilian Michels 
Committed: Fri May 27 18:39:49 2016 +0200

--
 content/capability-matrix/index.html | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/15dd578f/content/capability-matrix/index.html
--
diff --git a/content/capability-matrix/index.html 
b/content/capability-matrix/index.html
index 767df88..9981074 100644
--- a/content/capability-matrix/index.html
+++ b/content/capability-matrix/index.html
@@ -95,7 +95,7 @@
 
   
 Apache Beam Capability 
Matrix
-Last updated: 2016-05-18 20:55 
PDT
+Last updated: 2016-05-27 18:38 
CEST
 
 Apache Beam (incubating) provides a portable API layer for building 
sophisticated data-parallel processing engines that may be executed across a 
diversity of exeuction engines, or runners. The core concepts of this 
layer are based upon the Beam Model (formerly referred to as the http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf;>Dataflow Model), and 
implemented to varying degrees in each Beam runner. To help clarify the 
capabilities of individual runners, we’ve created the capability matrix 
below.
 
@@ -313,7 +313,7 @@
 
 
 
-~ (https://issues.apache.org/jira/browse/BEAM-103;>BEAM-103)
+
 
 
 
@@ -1082,7 +1082,7 @@
 
 
 
-Partially: 
parallelism 1 in streaming(https://issues.apache.org/jira/browse/BEAM-103;>BEAM-103)Fully supported in batch. In streaming, sources currently run with 
parallelism 1.
+Yes: fully 
supported
 
 
 



[3/3] incubator-beam-site git commit: This closes #19

2016-05-27 Thread mxm
This closes #19


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/65770137
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/65770137
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/65770137

Branch: refs/heads/asf-site
Commit: 657701378fc1935a67e5b5a5fe1427997b6c9029
Parents: 92fc275 15dd578
Author: Maximilian Michels 
Authored: Fri May 27 18:41:45 2016 +0200
Committer: Maximilian Michels 
Committed: Fri May 27 18:41:45 2016 +0200

--
 _data/capability-matrix.yml  | 7 +++
 content/capability-matrix/index.html | 6 +++---
 2 files changed, 6 insertions(+), 7 deletions(-)
--




[1/3] incubator-beam-site git commit: [BEAM-103] update capability matrix

2016-05-27 Thread mxm
Repository: incubator-beam-site
Updated Branches:
  refs/heads/asf-site 92fc27503 -> 657701378


[BEAM-103] update capability matrix

This reflects the changes of BEAM-103 in the Capability Matrix.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/e9937d9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/e9937d9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/e9937d9c

Branch: refs/heads/asf-site
Commit: e9937d9ce610c245ebc00c3b02e9f4dd3cb9e32d
Parents: 92fc275
Author: Maximilian Michels 
Authored: Tue May 24 15:41:21 2016 +0200
Committer: Maximilian Michels 
Committed: Fri May 27 18:38:16 2016 +0200

--
 _data/capability-matrix.yml | 7 +++
 1 file changed, 3 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/e9937d9c/_data/capability-matrix.yml
--
diff --git a/_data/capability-matrix.yml b/_data/capability-matrix.yml
index f34527d..e2f66b9 100644
--- a/_data/capability-matrix.yml
+++ b/_data/capability-matrix.yml
@@ -140,10 +140,9 @@ categories:
 l2: fully supported
 l3: 
   - class: flink
-jira: BEAM-103
-l1: 'Partially'
-l2: parallelism 1 in streaming
-l3: Fully supported in batch. In streaming, sources currently run 
with parallelism 1.
+l1: 'Yes'
+l2: fully supported
+l3:
   - class: spark
 l1: 'Yes'
 l2: fully supported



[2/2] incubator-beam git commit: This closes #344

2016-05-17 Thread mxm
This closes #344


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc64d654
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc64d654
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc64d654

Branch: refs/heads/master
Commit: cc64d654c5027c197eb1c1d6f64461edf1dee989
Parents: d627266 9f63000
Author: Maximilian Michels 
Authored: Tue May 17 19:19:12 2016 +0200
Committer: Maximilian Michels 
Committed: Tue May 17 19:19:12 2016 +0200

--
 .../flink/translation/wrappers/SinkOutputFormat.java | 15 +--
 1 file changed, 1 insertion(+), 14 deletions(-)
--




[1/2] incubator-beam git commit: [flink] replace obsolete reflection call

2016-05-17 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master d627266d8 -> cc64d654c


[flink] replace obsolete reflection call


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f630002
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f630002
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f630002

Branch: refs/heads/master
Commit: 9f630002e235f02042c309e57ea44a163ede8bdf
Parents: d627266
Author: Maximilian Michels 
Authored: Tue May 17 19:12:02 2016 +0200
Committer: Maximilian Michels 
Committed: Tue May 17 19:12:02 2016 +0200

--
 .../flink/translation/wrappers/SinkOutputFormat.java | 15 +--
 1 file changed, 1 insertion(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f630002/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
index 2766a87..53e544d 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
@@ -46,23 +46,10 @@ public class SinkOutputFormat implements OutputFormat 
{
   private AbstractID uid = new AbstractID();
 
   public SinkOutputFormat(Write.Bound transform, PipelineOptions 
pipelineOptions) {
-this.sink = extractSink(transform);
+this.sink = transform.getSink();
 this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
   }
 
-  private Sink extractSink(Write.Bound transform) {
-// TODO possibly add a getter in the upstream
-try {
-  Field sinkField = transform.getClass().getDeclaredField("sink");
-  sinkField.setAccessible(true);
-  @SuppressWarnings("unchecked")
-  Sink extractedSink = (Sink) sinkField.get(transform);
-  return extractedSink;
-} catch (NoSuchFieldException | IllegalAccessException e) {
-  throw new RuntimeException("Could not acquire custom sink field.", e);
-}
-  }
-
   @Override
   public void configure(Configuration configuration) {
 writeOperation = 
sink.createWriteOperation(serializedOptions.getPipelineOptions());



[GitHub] incubator-beam pull request: [flink] replace obsolete reflection c...

2016-05-17 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/344

[flink] replace obsolete reflection call

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [X] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [X] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [X] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [X] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam reflectionCall

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/344.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #344


commit 9f630002e235f02042c309e57ea44a163ede8bdf
Author: Maximilian Michels <m...@apache.org>
Date:   2016-05-17T17:12:02Z

[flink] replace obsolete reflection call




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #324

2016-05-12 Thread mxm
This closes #324


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/123674f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/123674f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/123674f4

Branch: refs/heads/master
Commit: 123674f4b5a823cc593514a131943fbcc462ab7a
Parents: 6ec9e96 50edd23
Author: Maximilian Michels 
Authored: Thu May 12 10:57:33 2016 +0200
Committer: Maximilian Michels 
Committed: Thu May 12 10:57:33 2016 +0200

--
 runners/flink/runner/pom.xml| 10 ---
 .../runners/flink/FlinkPipelineOptions.java | 30 ++--
 .../beam/runners/flink/FlinkPipelineRunner.java |  4 +--
 3 files changed, 28 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/123674f4/runners/flink/runner/pom.xml
--



[GitHub] incubator-beam pull request: [BEAM-272][flink] remove dependency o...

2016-05-11 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/324

[BEAM-272][flink] remove dependency on Dataflow Runner

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [X] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [X] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [X] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [X] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-272

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #324


commit abfe73a962624001022a83ce4dd5c1697037bdbf
Author: Maximilian Michels <m...@apache.org>
Date:   2016-05-11T09:57:44Z

[BEAM-272][flink] remove dependency on Dataflow Runner




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/4] incubator-beam git commit: fix Flink source coder handling

2016-04-29 Thread mxm
fix Flink source coder handling


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aead96ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aead96ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aead96ff

Branch: refs/heads/master
Commit: aead96ff4c018b96a7b5ab1defb408c2a09b1be7
Parents: bc847a9
Author: Maximilian Michels 
Authored: Thu Apr 28 12:00:18 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Apr 29 17:58:00 2016 +0200

--
 .../FlinkStreamingTransformTranslators.java | 13 +++-
 .../flink/translation/types/FlinkCoder.java | 64 
 .../streaming/io/UnboundedFlinkSource.java  | 12 +++-
 3 files changed, 84 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aead96ff/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index db24f9d..618727d 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink.translation;
 
 import org.apache.beam.runners.flink.translation.functions.UnionCoder;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.FlinkCoder;
 import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupByKeyWrapper;
@@ -262,9 +263,15 @@ public class FlinkStreamingTransformTranslators {
   DataStream source;
   if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) 
{
 @SuppressWarnings("unchecked")
-UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) 
transform.getSource();
-source = context.getExecutionEnvironment()
-.addSource(flinkSource.getFlinkSource())
+UnboundedFlinkSource flinkSourceFunction = 
(UnboundedFlinkSource) transform.getSource();
+DataStream flinkSource = context.getExecutionEnvironment()
+.addSource(flinkSourceFunction.getFlinkSource());
+
+flinkSourceFunction.setCoder(
+new FlinkCoder(flinkSource.getType(),
+  context.getExecutionEnvironment().getConfig()));
+
+source = flinkSource
 .flatMap(new FlatMapFunction() {
   @Override
   public void flatMap(T s, Collector collector) 
throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aead96ff/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java
new file mode 100644
index 000..3b1e66e
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import 

[3/4] incubator-beam git commit: Flink sink implementation

2016-04-29 Thread mxm
Flink sink implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc847a95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc847a95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc847a95

Branch: refs/heads/master
Commit: bc847a9582447372461c5cf35450ba4a4c3d490d
Parents: 4fd9d74
Author: Maximilian Michels 
Authored: Fri Apr 22 12:33:26 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Apr 29 17:58:00 2016 +0200

--
 .../FlinkStreamingTransformTranslators.java |  33 +++-
 .../streaming/io/UnboundedFlinkSink.java| 175 +++
 2 files changed, 204 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc847a95/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 927c3a2..db24f9d 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -26,13 +26,16 @@ import 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupBy
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundMultiWrapper;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundWrapper;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.FlinkStreamingCreateFunction;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -64,12 +67,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
-import org.apache.flink.streaming.api.functions.TimestampAssigner;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
-import org.apache.kafka.common.utils.Time;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,6 +103,9 @@ public class FlinkStreamingTransformTranslators {
 TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
 TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
 TRANSLATORS.put(TextIO.Write.Bound.class, new 
TextIOWriteBoundStreamingTranslator());
+
+TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+
 TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
 TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
 TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
@@ -193,6 +195,29 @@ public class FlinkStreamingTransformTranslators {
 }
   }
 
+  private static class WriteSinkStreamingTranslator implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator {
+
+@Override
+public void translateNode(Write.Bound transform, 
FlinkStreamingTranslationContext context) {
+  String name = transform.getName();
+  PValue input = context.getInput(transform);
+
+  Sink sink = transform.getSink();
+  if (!(sink instanceof UnboundedFlinkSink)) {
+throw new UnsupportedOperationException("At the time, only unbounded 
Flink sinks are supported.");
+  }
+
+  DataStream inputDataSet = 
context.getInputDataStream(input);
+
+  inputDataSet.flatMap(new FlatMapFunction() {
+

[4/4] incubator-beam git commit: This closes #266

2016-04-29 Thread mxm
This closes #266


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/661a4a89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/661a4a89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/661a4a89

Branch: refs/heads/master
Commit: 661a4a893c5afa2f257969bd25d4c01c42693fac
Parents: 4fd9d74 63bce07
Author: Maximilian Michels 
Authored: Fri Apr 29 17:58:11 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Apr 29 17:58:11 2016 +0200

--
 .../examples/streaming/KafkaIOExamples.java | 337 +++
 .../FlinkStreamingTransformTranslators.java |  46 ++-
 .../flink/translation/types/FlinkCoder.java |  64 
 .../streaming/io/UnboundedFlinkSink.java| 175 ++
 .../streaming/io/UnboundedFlinkSource.java  |  12 +-
 5 files changed, 625 insertions(+), 9 deletions(-)
--




[1/4] incubator-beam git commit: add Kafka IO examples

2016-04-29 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master 4fd9d74df -> 661a4a893


add Kafka IO examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63bce07d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63bce07d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63bce07d

Branch: refs/heads/master
Commit: 63bce07d8c6cc5e610ad24e915e2585fef582567
Parents: aead96f
Author: Maximilian Michels 
Authored: Thu Apr 28 12:02:05 2016 +0200
Committer: Maximilian Michels 
Committed: Fri Apr 29 17:58:00 2016 +0200

--
 .../examples/streaming/KafkaIOExamples.java | 337 +++
 1 file changed, 337 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/63bce07d/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
--
diff --git 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
new file mode 100644
index 000..af6bb35
--- /dev/null
+++ 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.examples.streaming;
+
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * Recipes/Examples that demonstrate how to read/write data from/to Kafka.
+ */
+public class KafkaIOExamples {
+
+
+  private static final String KAFKA_TOPIC = "input";  // Default kafka topic 
to read from
+  private static final String KAFKA_AVRO_TOPIC = "output";  // Default kafka 
topic to read from
+  private static final String KAFKA_BROKER = "localhost:9092";  // Default 
kafka broker to contact
+  private static final String GROUP_ID = "myGroup";  // Default groupId
+  private static final String ZOOKEEPER = "localhost:2181";  // Default 
zookeeper to connect to for Kafka
+
+  /**
+   * Read/Write String data to Kafka
+   */
+  public static class KafkaString {
+
+/**
+ * Read String data from Kafka
+ */
+public static class ReadStringFromKafka {
+
+  public static void main(String[] args) {
+
+Pipeline p = initializePipeline(args);
+KafkaOptions options = getOptions(p);
+
+FlinkKafkaConsumer08 kafkaConsumer =
+new 

[GitHub] incubator-beam pull request: Add option to use Flink's Kafka Write...

2016-04-29 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/266

Add option to use Flink's Kafka Write IO

This pull request adds the counterpart of the UnboundedFlinkSource, the 
`UnboundedFlinkSink` which uses the `Write` API. Users have requested this 
multiple times, e.g. to use the Flink Kafka Producer in a Beam program. In the 
long run we will opt only for Beam IO interfaces. I would like to replace the 
custom Flink sources and sinks as soon as we have the relevant connectors for 
users in place. In the meantime, users can explore the potential of Beam using 
also native backend connectors. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam kafkaSink-pr

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/266.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #266


commit 3780b01f9ff0a2ffb645b961e127c50ae97affd8
Author: Maximilian Michels <m...@apache.org>
Date:   2016-04-22T10:33:26Z

Kafka sink implementation

commit 1db316971b6ecd0a27cefb0408266c914c1f7d89
Author: Maximilian Michels <m...@apache.org>
Date:   2016-04-28T10:00:18Z

fix Flink source coder handling

commit fff968b03177ba53f3bdad2055f67dc5633d5628
Author: Maximilian Michels <m...@apache.org>
Date:   2016-04-28T10:02:05Z

add Kafka IO examples




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: [BEAM-207] Flink test flake in ReadSourceStreamingITCase

2016-04-20 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master b8951c231 -> d5b1d5135


[BEAM-207] Flink test flake in ReadSourceStreamingITCase


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd8bc93e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd8bc93e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd8bc93e

Branch: refs/heads/master
Commit: dd8bc93ec3104c481a9ea646406194c1116dae71
Parents: 70e6a13
Author: Maximilian Michels 
Authored: Tue Apr 19 09:20:30 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Apr 20 19:17:50 2016 +0200

--
 .../flink/translation/wrappers/SourceInputFormat.java   | 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd8bc93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index dc11c77..debd1a1 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -46,8 +46,8 @@ public class SourceInputFormat implements InputFormat
   private transient PipelineOptions options;
   private final SerializedPipelineOptions serializedOptions;
 
-  private transient BoundedSource.BoundedReader reader = null;
-  private boolean inputAvailable = true;
+  private transient BoundedSource.BoundedReader reader;
+  private boolean inputAvailable = false;
 
   public SourceInputFormat(BoundedSource initialSource, PipelineOptions 
options) {
 this.initialSource = initialSource;
@@ -135,6 +135,9 @@ public class SourceInputFormat implements InputFormat
 
   @Override
   public void close() throws IOException {
-reader.close();
+// TODO null check can be removed once FLINK-3796 is fixed
+if (reader != null) {
+  reader.close();
+}
   }
 }



[GitHub] incubator-beam pull request: [BEAM-207] Flink test flake in ReadSo...

2016-04-19 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/209

[BEAM-207] Flink test flake in ReadSourceStreamingITCase

The `configure(..)` life cycle method is only called on the master but not 
on the worker nodes. This may lead to an incorrect initialization of the 
`Reader` because the `PipelineOptions` haven't been initialized.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-207

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/209.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #209


commit a95e67c482d3378cf472e75a47275cedbf70de41
Author: Maximilian Michels <m...@apache.org>
Date:   2016-04-19T07:20:30Z

[BEAM-207] Flink test flake in ReadSourceStreamingITCase

The configure(..) life cycle method is only called on the master but not
on the worker nodes. This may lead to an incorrect initialization of the
Reader because the PipelineOptions haven't been initialized.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/3] incubator-beam git commit: [BEAM-196] abstraction for PipelineOptions serialization

2016-04-19 Thread mxm
[BEAM-196] abstraction for PipelineOptions serialization


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81577b31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81577b31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81577b31

Branch: refs/heads/master
Commit: 81577b31c2642522f7dd4ba8eba794df48a0ca56
Parents: 56e28a9
Author: Maximilian Michels 
Authored: Mon Apr 18 17:40:38 2016 +0200
Committer: Maximilian Michels 
Committed: Mon Apr 18 18:10:05 2016 +0200

--
 .../utils/SerializedPipelineOptions.java| 54 
 .../beam/runners/flink/PipelineOptionsTest.java | 68 
 2 files changed, 122 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81577b31/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
new file mode 100644
index 000..7439e02
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink.translation.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the 
cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+  private byte[] serializedOptions;
+
+  public SerializedPipelineOptions(PipelineOptions options) {
+
+try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+  new ObjectMapper().writeValue(baos, options);
+  this.serializedOptions = baos.toByteArray();
+} catch (Exception e) {
+  throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+}
+
+  }
+
+  public PipelineOptions deserializeOptions() {
+try {
+  return new ObjectMapper().readValue(serializedOptions, 
PipelineOptions.class);
+} catch (IOException e) {
+  throw new RuntimeException("Couldn't deserialize the PipelineOptions.", 
e);
+}
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81577b31/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
--
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
new file mode 100644
index 000..464c6df
--- /dev/null
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.

[3/3] incubator-beam git commit: This closes #200

2016-04-19 Thread mxm
This closes #200


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/70e6a131
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/70e6a131
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/70e6a131

Branch: refs/heads/master
Commit: 70e6a1310fd3871fdd10287d6a50e24edb7f6f18
Parents: bf78e96 43b5ec7
Author: Maximilian Michels 
Authored: Tue Apr 19 09:44:51 2016 +0200
Committer: Maximilian Michels 
Committed: Tue Apr 19 09:44:51 2016 +0200

--
 .../functions/FlinkDoFnFunction.java| 25 ++-
 .../functions/FlinkMultiOutputDoFnFunction.java | 27 ++--
 .../utils/SerializedPipelineOptions.java| 63 ++
 .../translation/wrappers/SinkOutputFormat.java  | 28 ++--
 .../translation/wrappers/SourceInputFormat.java | 24 ++-
 .../FlinkGroupAlsoByWindowWrapper.java  | 53 ++-
 .../streaming/io/UnboundedSourceWrapper.java| 28 ++--
 .../beam/runners/flink/PipelineOptionsTest.java | 68 
 8 files changed, 157 insertions(+), 159 deletions(-)
--




[GitHub] incubator-beam pull request: [BEAM-196] Pipeline options must be a...

2016-04-18 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/200

[BEAM-196] Pipeline options must be available Context in DoFn.startBundle

This gets rid of the custom Java serialization code by defaulting to 
serialization of the `PipelineOptions` to a byte array. So far, this has been 
proven the most hassle-free method for the Flink Runner. For code reuse and 
avoiding multiple deserialization of the byte array, the 
`SerializedPipelineOptions` class has been introduced.

The changes also make the options accessible in the context of the `DoFn` 
function.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-196

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/200.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #200


commit 81577b31c2642522f7dd4ba8eba794df48a0ca56
Author: Maximilian Michels <m...@apache.org>
Date:   2016-04-18T15:40:38Z

[BEAM-196] abstraction for PipelineOptions serialization

commit 43b5ec743718e63c2d9d9532e3ca55bc87370290
Author: Maximilian Michels <m...@apache.org>
Date:   2016-04-18T15:40:50Z

[BEAM-196] make use of SerializedPipelineOptions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[3/4] incubator-beam git commit: [flink] improve InputFormat wrapper and ReadSourceITCase

2016-04-18 Thread mxm
[flink] improve InputFormat wrapper and ReadSourceITCase


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6eac35e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6eac35e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6eac35e8

Branch: refs/heads/master
Commit: 6eac35e81e93c25da4668fc1b0d30f7c942383f0
Parents: 7646384
Author: Maximilian Michels 
Authored: Wed Mar 30 16:43:04 2016 +0200
Committer: Maximilian Michels 
Committed: Mon Apr 18 16:36:43 2016 +0200

--
 .../translation/wrappers/SourceInputFormat.java |  83 +++
 .../beam/runners/flink/ReadSourceITCase.java| 100 ++-
 2 files changed, 43 insertions(+), 140 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6eac35e8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index 26e6297..4b11abc 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -23,20 +23,20 @@ import org.apache.beam.sdk.options.PipelineOptions;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.List;
 
+
 /**
  * A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a
  * Dataflow {@link org.apache.beam.sdk.io.Source}.
@@ -45,37 +45,40 @@ public class SourceInputFormat implements InputFormat
   private static final Logger LOG = 
LoggerFactory.getLogger(SourceInputFormat.class);
 
   private final BoundedSource initialSource;
+
   private transient PipelineOptions options;
+  private final byte[] serializedOptions;
 
-  private BoundedSource.BoundedReader reader = null;
-  private boolean reachedEnd = true;
+  private transient BoundedSource.BoundedReader reader = null;
+  private boolean inputAvailable = true;
 
   public SourceInputFormat(BoundedSource initialSource, PipelineOptions 
options) {
 this.initialSource = initialSource;
 this.options = options;
-  }
 
-  private void writeObject(ObjectOutputStream out)
-  throws IOException, ClassNotFoundException {
-out.defaultWriteObject();
-ObjectMapper mapper = new ObjectMapper();
-mapper.writeValue(out, options);
-  }
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+try {
+  new ObjectMapper().writeValue(baos, options);
+  serializedOptions = baos.toByteArray();
+} catch (Exception e) {
+  throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+}
 
-  private void readObject(ObjectInputStream in)
-  throws IOException, ClassNotFoundException {
-in.defaultReadObject();
-ObjectMapper mapper = new ObjectMapper();
-options = mapper.readValue(in, PipelineOptions.class);
   }
 
   @Override
-  public void configure(Configuration configuration) {}
+  public void configure(Configuration configuration) {
+try {
+  options = new ObjectMapper().readValue(serializedOptions, 
PipelineOptions.class);
+} catch (IOException e) {
+  throw new RuntimeException("Couldn't deserialize the PipelineOptions.", 
e);
+}
+  }
 
   @Override
   public void open(SourceInputSplit sourceInputSplit) throws IOException {
 reader = ((BoundedSource) 
sourceInputSplit.getSource()).createReader(options);
-reachedEnd = false;
+inputAvailable = reader.start();
   }
 
   @Override
@@ -87,7 +90,6 @@ public class SourceInputFormat implements InputFormat
 @Override
 public long getTotalInputSize() {
   return estimatedSize;
-
 }
 
 @Override
@@ -110,17 +112,15 @@ public class SourceInputFormat implements 
InputFormat
   @Override
   @SuppressWarnings("unchecked")
   

[2/4] incubator-beam git commit: [flink] improvements to UnboundedSource translation

2016-04-18 Thread mxm
[flink] improvements to UnboundedSource translation


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c4f2dc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c4f2dc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c4f2dc1

Branch: refs/heads/master
Commit: 7c4f2dc1e74d7d985fef80cc3cbccb6e390d16aa
Parents: 6eac35e
Author: Maximilian Michels 
Authored: Wed Mar 30 19:05:27 2016 +0200
Committer: Maximilian Michels 
Committed: Mon Apr 18 16:36:43 2016 +0200

--
 .../FlinkStreamingTransformTranslators.java| 17 -
 1 file changed, 12 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c4f2dc1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 541cd40..a1e9f47 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -194,19 +194,26 @@ public class FlinkStreamingTransformTranslators {
 
   DataStream source;
   if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) 
{
-UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) 
transform.getSource();
+@SuppressWarnings("unchecked")
+UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) 
transform.getSource();
 source = context.getExecutionEnvironment()
 .addSource(flinkSource.getFlinkSource())
-.flatMap(new FlatMapFunction() {
+.flatMap(new FlatMapFunction() {
   @Override
-  public void flatMap(String s, Collector 
collector) throws Exception {
-collector.collect(WindowedValue.of(s, Instant.now(), 
GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+  public void flatMap(T s, Collector collector) 
throws Exception {
+collector.collect(
+WindowedValue.of(
+s,
+Instant.now(),
+GlobalWindow.INSTANCE,
+PaneInfo.NO_FIRING));
   }
-}).assignTimestampsAndWatermarks(new IngestionTimeExtractor());
+}).assignTimestampsAndWatermarks(new 
IngestionTimeExtractor());
   } else {
 source = context.getExecutionEnvironment()
 .addSource(new 
UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
   }
+
   context.setOutputDataStream(output, source);
 }
   }



[GitHub] incubator-beam pull request: [BEAM-158] add support for bounded so...

2016-03-31 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/104

[BEAM-158] add support for bounded sources in streaming

Apart from a few improvements, this PR introduces bounded sources in 
streaming. The BoundedSource wrapper (`SourceInputFormat`) is the same as for 
the batch part of the runner. The translator assigns ingestion time watermarks 
and processing time timestamps upon reading from the source. We could make this 
more flexible in terms of watermark generation if we had an UnboundedSource 
wrapper for a BoundedSource.

Perhaps we could have common utility for runners to deal with serialization 
of PipelineOptions. At some point, they have to be shipped. I had to change the 
serialization code because I was experiencing a serialization bug which led to 
a serialization loop. Debugging this was almost impossible because the stack 
trace doesn't show all serialization calls due to some magic in the VM. I 
didn't find any cyclic references between the PipelineOptions and Flink 
components. I'm assuming this is a bug and the workaround using byte array 
serialization of the options is fair enough. See `SourceInputFormat`.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-158

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/104.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #104


commit f26674e7a7b30ee3f992edfc8e473df2a7ee3e80
Author: Maximilian Michels <m...@apache.org>
Date:   2016-03-30T14:43:04Z

[flink] improve InputFormat wrapper and ReadSourceITCase

commit 03404c7f4a5656bb5c5c0a2510f12e33292fef01
Author: Maximilian Michels <m...@apache.org>
Date:   2016-03-30T17:05:27Z

[flink] improvements to UnboundedSource translation

commit de574136a5b4ba9b75231b321d0190e23af3bac2
Author: Maximilian Michels <m...@apache.org>
Date:   2016-03-31T08:18:01Z

[BEAM-158] add support for bounded sources in streaming




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[3/3] incubator-beam git commit: This closes #94.

2016-03-31 Thread mxm
This closes #94.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96e286fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96e286fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96e286fe

Branch: refs/heads/master
Commit: 96e286fec758bb451ff383e6e7c3f2b5bb0cb840
Parents: 0c47cad 63a7c3d
Author: Maximilian Michels 
Authored: Thu Mar 31 11:11:09 2016 +0200
Committer: Maximilian Michels 
Committed: Thu Mar 31 11:11:09 2016 +0200

--
 .../FlinkGroupAlsoByWindowWrapper.java  | 22 +---
 1 file changed, 14 insertions(+), 8 deletions(-)
--




[1/3] incubator-beam git commit: [flink] improve lifecycle handling of GroupAlsoByWindowWrapper

2016-03-31 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master 0c47cad48 -> 96e286fec


[flink] improve lifecycle handling of GroupAlsoByWindowWrapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/033b9240
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/033b9240
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/033b9240

Branch: refs/heads/master
Commit: 033b9240765543438068c1adea6d0cff34ddcd53
Parents: 17863c8
Author: Maximilian Michels 
Authored: Mon Mar 28 11:31:38 2016 +0200
Committer: Maximilian Michels 
Committed: Wed Mar 30 11:31:56 2016 +0200

--
 .../wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java  | 6 ++
 1 file changed, 2 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/033b9240/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index b413d7a..751d44c 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -220,6 +220,7 @@ public class FlinkGroupAlsoByWindowWrapper
   public void open() throws Exception {
 super.open();
 this.context = new ProcessContext(operator, new 
TimestampedCollector<>(output), this.timerInternals);
+operator.startBundle(context);
   }
 
   /**
@@ -252,11 +253,7 @@ public class FlinkGroupAlsoByWindowWrapper
 
   private void processKeyedWorkItem(KeyedWorkItem workItem) throws 
Exception {
 context.setElement(workItem, getStateInternalsForKey(workItem.key()));
-
-// TODO: Ideally startBundle/finishBundle would be called when the 
operator is first used / about to be discarded.
-operator.startBundle(context);
 operator.processElement(context);
-operator.finishBundle(context);
   }
 
   @Override
@@ -309,6 +306,7 @@ public class FlinkGroupAlsoByWindowWrapper
 
   @Override
   public void close() throws Exception {
+operator.finishBundle(context);
 super.close();
   }
 



[GitHub] incubator-beam pull request: [flink] improve lifecycle handling of...

2016-03-30 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/94

[flink] improve lifecycle handling of GroupAlsoByWindowWrapper 

Could someone have a look if these minor changes are good to merge? 
@davorbonaci @kennknowles 

Note, that there are two commits. One for the lifecycle of the Operator, 
another one for improving code readability.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam lifecycle

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/94.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #94


commit 033b9240765543438068c1adea6d0cff34ddcd53
Author: Maximilian Michels <m...@apache.org>
Date:   2016-03-28T09:31:38Z

[flink] improve lifecycle handling of GroupAlsoByWindowWrapper

commit 63a7c3d0cb51caf65dc82141671cf28d47c2be39
Author: Maximilian Michels <m...@apache.org>
Date:   2016-03-30T10:02:01Z

[flink] improve readability of processElement function




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request: [BEAM-149] move language source confi...

2016-03-24 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/73

[BEAM-149] move language source config to pluginManagement

This fixes issues with the source/target level in IntelliJ.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-149

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/73.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #73


commit d869d7d371654b694bde498ae6b2b4a1d320850c
Author: Maximilian Michels <m...@apache.org>
Date:   2016-03-24T17:54:05Z

[BEAM-149] move language source config to pluginManagement

This fixes issues with the source/target level in IntelliJ.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[05/10] incubator-beam git commit: Update AutoComplete.java

2016-03-23 Thread mxm
Update AutoComplete.java

Allow for the Datastore dependency of this test to be in a different project 
than the main project for the job.

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/834d0710
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/834d0710
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/834d0710

Branch: refs/heads/master
Commit: 834d071060cb916bedc0859baa256791ab22b7d4
Parents: 158f9f8
Author: sammcveety 
Authored: Tue Mar 22 11:58:19 2016 -0700
Committer: Maximilian Michels 
Committed: Wed Mar 23 19:27:51 2016 +0100

--
 .../cloud/dataflow/examples/complete/AutoComplete.java| 10 --
 1 file changed, 8 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834d0710/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
--
diff --git 
a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
 
b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
index 1bccc4a..f897338 100644
--- 
a/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
+++ 
b/examples/src/main/java/com/google/cloud/dataflow/examples/complete/AutoComplete.java
@@ -57,6 +57,7 @@ import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PBegin;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 
 import org.joda.time.Duration;
@@ -432,10 +433,14 @@ public class AutoComplete {
 Boolean getOutputToBigQuery();
 void setOutputToBigQuery(Boolean value);
 
-@Description("Whether output to Datastoree")
+@Description("Whether output to Datastore")
 @Default.Boolean(false)
 Boolean getOutputToDatastore();
 void setOutputToDatastore(Boolean value);
+
+@Description("Datastore output dataset ID, defaults to project ID")
+String getOutputDataset();
+void setOutputDataset(String value);
   }
 
   public static void main(String[] args) throws IOException {
@@ -477,7 +482,8 @@ public class AutoComplete {
 if (options.getOutputToDatastore()) {
   toWrite
   .apply(ParDo.named("FormatForDatastore").of(new 
FormatForDatastore(options.getKind(
-  .apply(DatastoreIO.writeTo(options.getProject()));
+  .apply(DatastoreIO.writeTo(MoreObjects.firstNonNull(
+  options.getOutputDataset(), options.getProject(;
 }
 if (options.getOutputToBigQuery()) {
   dataflowUtils.setupBigQueryTable();



[10/10] incubator-beam git commit: This closes #69

2016-03-23 Thread mxm
This closes #69


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f902582
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f902582
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f902582

Branch: refs/heads/master
Commit: 2f902582c1a52c1bb95edc647007eb4a83964508
Parents: 8d87ee0 1504ba7
Author: Maximilian Michels 
Authored: Wed Mar 23 19:30:00 2016 +0100
Committer: Maximilian Michels 
Committed: Wed Mar 23 19:30:00 2016 +0100

--
 .../streaming/io/UnboundedSourceWrapper.java|  87 +---
 .../flink/streaming/UnboundedSourceITCase.java  | 210 +++
 2 files changed, 272 insertions(+), 25 deletions(-)
--




[03/10] incubator-beam git commit: Add DisplayData builder API to SDK

2016-03-23 Thread mxm
Add DisplayData builder API to SDK

This allows generating the display data which will be attached to
PTransforms.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5ecb7aa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5ecb7aa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5ecb7aa7

Branch: refs/heads/master
Commit: 5ecb7aa7a8ac107e2bdb8518da2bee714ceba122
Parents: cb5d6c2
Author: Scott Wegner 
Authored: Thu Mar 17 10:22:42 2016 -0700
Committer: Maximilian Michels 
Committed: Wed Mar 23 19:27:51 2016 +0100

--
 sdk/pom.xml |   7 +
 .../cloud/dataflow/sdk/transforms/DoFn.java |  13 +-
 .../dataflow/sdk/transforms/PTransform.java |  14 +-
 .../cloud/dataflow/sdk/transforms/ParDo.java|  13 +
 .../sdk/transforms/display/DisplayData.java | 517 +++
 .../sdk/transforms/display/HasDisplayData.java  |  53 ++
 .../cloud/dataflow/sdk/transforms/DoFnTest.java |  15 +
 .../dataflow/sdk/transforms/PTransformTest.java |  41 ++
 .../dataflow/sdk/transforms/ParDoTest.java  |  23 +
 .../transforms/display/DisplayDataMatchers.java |  98 +++
 .../display/DisplayDataMatchersTest.java|  81 +++
 .../sdk/transforms/display/DisplayDataTest.java | 633 +++
 .../cloud/dataflow/sdk/util/ApiSurfaceTest.java |   3 +-
 13 files changed, 1508 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ecb7aa7/sdk/pom.xml
--
diff --git a/sdk/pom.xml b/sdk/pom.xml
index 71f5097..185abc2 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -615,6 +615,13 @@
 
 
 
+  com.google.guava
+  guava-testlib
+  ${guava.version}
+  test
+
+
+
   com.google.protobuf
   protobuf-java
   ${protobuf.version}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ecb7aa7/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java
--
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java
index af06cc8..5ba9992 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java
@@ -24,6 +24,8 @@ import com.google.cloud.dataflow.sdk.annotations.Experimental;
 import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
+import com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
 import com.google.cloud.dataflow.sdk.util.WindowingInternals;
@@ -69,7 +71,7 @@ import java.util.UUID;
  * @param  the type of the (main) input elements
  * @param  the type of the (main) output elements
  */
-public abstract class DoFn implements Serializable {
+public abstract class DoFn implements Serializable, 
HasDisplayData {
 
   /**
* Information accessible to all methods in this {@code DoFn}.
@@ -366,6 +368,15 @@ public abstract class DoFn implements 
Serializable {
   public void finishBundle(Context c) throws Exception {
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * By default, does not register any display data. Implementors may 
override this method
+   * to provide their own display metadata.
+   */
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+  }
 
   /
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ecb7aa7/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java
--
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java
index 8a74509..d4496b8 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java
@@ -19,6 +19,8 @@ package com.google.cloud.dataflow.sdk.transforms;
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
 import com.google.cloud.dataflow.sdk.coders.Coder;

[02/10] incubator-beam git commit: Implement InProcessPipelineRunner#run

2016-03-23 Thread mxm
Implement InProcessPipelineRunner#run

Appropriately construct an evaluation context and executor, and start
the pipeline when run is called.

Implement InProcessPipelineResult.

Apply PTransform overrides.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/158f9f8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/158f9f8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/158f9f8d

Branch: refs/heads/master
Commit: 158f9f8d41c63f5a002c6187f4f05f169579dd6d
Parents: 5ecb7aa
Author: Thomas Groh 
Authored: Fri Feb 26 17:30:13 2016 -0800
Committer: Maximilian Michels 
Committed: Wed Mar 23 19:27:51 2016 +0100

--
 .../CachedThreadPoolExecutorServiceFactory.java |  42 
 .../ConsumerTrackingPipelineVisitor.java| 173 ++
 .../inprocess/ExecutorServiceFactory.java   |  32 +++
 .../ExecutorServiceParallelExecutor.java|   2 +-
 .../inprocess/GroupByKeyEvaluatorFactory.java   |   4 +-
 .../inprocess/InProcessPipelineOptions.java |  56 +
 .../inprocess/InProcessPipelineRunner.java  | 228 +++---
 .../inprocess/KeyedPValueTrackingVisitor.java   |  95 
 .../ConsumerTrackingPipelineVisitorTest.java| 233 +++
 .../inprocess/InProcessPipelineRunnerTest.java  |  77 ++
 .../KeyedPValueTrackingVisitorTest.java | 189 +++
 11 files changed, 1101 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/158f9f8d/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
--
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
new file mode 100644
index 000..3350d2b
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CachedThreadPoolExecutorServiceFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ExecutorServiceFactory} that produces cached thread pools via
+ * {@link Executors#newCachedThreadPool()}.
+ */
+class CachedThreadPoolExecutorServiceFactory
+implements DefaultValueFactory, 
ExecutorServiceFactory {
+  private static final CachedThreadPoolExecutorServiceFactory INSTANCE =
+  new CachedThreadPoolExecutorServiceFactory();
+
+  @Override
+  public ExecutorServiceFactory create(PipelineOptions options) {
+return INSTANCE;
+  }
+
+  @Override
+  public ExecutorService create() {
+return Executors.newCachedThreadPool();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/158f9f8d/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
--
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
new file mode 100644
index 000..c602b23
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * 

[01/10] incubator-beam git commit: make BigtableIO#Read#withRowFilter public

2016-03-23 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master 8d87ee02b -> 2f902582c


make BigtableIO#Read#withRowFilter public


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e39b5d9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e39b5d9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e39b5d9a

Branch: refs/heads/master
Commit: e39b5d9afec2bfa64552a63983eaac1df5da6a8d
Parents: 834d071
Author: Neville Li 
Authored: Wed Mar 23 06:05:51 2016 +0800
Committer: Maximilian Michels 
Committed: Wed Mar 23 19:27:51 2016 +0100

--
 .../java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e39b5d9a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
--
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
index 81a85fa..7d59b09 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
@@ -211,7 +211,7 @@ public class BigtableIO {
  *
  * Does not modify this object.
  */
-Read withRowFilter(RowFilter filter) {
+public Read withRowFilter(RowFilter filter) {
   checkNotNull(filter, "filter");
   return new Read(options, tableId, filter, bigtableService);
 }



[09/10] incubator-beam git commit: [BEAM-143] [flink] add test for UnboundedSourceWrapper

2016-03-23 Thread mxm
[BEAM-143] [flink] add test for UnboundedSourceWrapper

The test ensures serialization and execution of the wrapper works as
expected.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1504ba7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1504ba7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1504ba7b

Branch: refs/heads/master
Commit: 1504ba7b771173e8ad1cb8b1714d38f2410ef706
Parents: ac5a1e8
Author: Maximilian Michels 
Authored: Wed Mar 23 16:19:27 2016 +0100
Committer: Maximilian Michels 
Committed: Wed Mar 23 19:29:14 2016 +0100

--
 .../flink/streaming/UnboundedSourceITCase.java  | 210 +++
 1 file changed, 210 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1504ba7b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
--
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
new file mode 100644
index 000..f36028e
--- /dev/null
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.runners.flink.FlinkTestPipeline;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Instant;
+import org.junit.internal.ArrayComparisonFailure;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+
+public class UnboundedSourceITCase extends StreamingProgramTestBase {
+
+  protected static String resultPath;
+
+  public UnboundedSourceITCase() {
+  }
+
+  static final String[] EXPECTED_RESULT = new String[]{
+  "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+runProgram(resultPath);
+  }
+
+  private static void runProgram(String resultPath) {
+
+Pipeline p = FlinkTestPipeline.createForStreaming();
+
+PCollection result = p
+.apply(Read.from(new RangeReadSource(1, 10)))
+.apply(Window.into(new GlobalWindows())
+

[04/10] incubator-beam git commit: [flink] Add FlinkRunnerRegistrar

2016-03-23 Thread mxm
[flink] Add FlinkRunnerRegistrar

Expose Flink runner and options via AuteService. AuteService will
at compile time populate META-INF/services so that Dataflow sdk
can seamlessly pick up FlinkRunner.

This closes #40.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e94e7d6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e94e7d6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e94e7d6a

Branch: refs/heads/master
Commit: e94e7d6a5c227eec79d696acd963a88286a487a9
Parents: a20e0b6
Author: Rafal Wojdyla 
Authored: Fri Mar 11 13:39:55 2016 -0500
Committer: Maximilian Michels 
Committed: Wed Mar 23 19:27:51 2016 +0100

--
 runners/flink/runner/pom.xml| 12 +++--
 .../runners/flink/FlinkRunnerRegistrar.java | 56 
 2 files changed, 63 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e94e7d6a/runners/flink/runner/pom.xml
--
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 212b973..ff4b368 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -84,11 +84,6 @@
 
   
 
-
-  com.google.auto.service
-  auto-service
-  1.0-rc2
-
 
 
   com.google.cloud.dataflow
@@ -121,6 +116,13 @@
   1.9.5
   test
 
+
+
+  com.google.auto.service
+  auto-service
+  1.0-rc2
+  true
+
   
 
   

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e94e7d6a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
new file mode 100644
index 000..3e30ab9
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar;
+import com.google.common.collect.ImmutableList;
+
+
+/**
+ * AuteService registrar - will register FlinkRunner and FlinkOptions
+ * as possible pipeline runner services.
+ *
+ * It ends up in META-INF/services and gets picked up by Dataflow.
+ *
+ */
+public class FlinkRunnerRegistrar {
+  private FlinkRunnerRegistrar() { }
+
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class Runner implements PipelineRunnerRegistrar {
+@Override
+public Iterable> getPipelineRunners() {
+  return ImmutableList.>of(FlinkPipelineRunner.class);
+}
+  }
+
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class Options implements PipelineOptionsRegistrar {
+@Override
+public Iterable getPipelineOptions() {
+  return ImmutableList.of(FlinkPipelineOptions.class);
+}
+  }
+}



[06/10] incubator-beam git commit: [BEAM-116] change runners artifactId to runners-parent

2016-03-23 Thread mxm
[BEAM-116] change runners artifactId to runners-parent


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a20e0b64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a20e0b64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a20e0b64

Branch: refs/heads/master
Commit: a20e0b6405b1741f7b5e59bd433a0ab5b038a6b3
Parents: 91de072
Author: Maximilian Michels 
Authored: Wed Mar 16 12:03:07 2016 +0100
Committer: Maximilian Michels 
Committed: Wed Mar 23 19:27:51 2016 +0100

--
 runners/flink/pom.xml | 2 +-
 runners/pom.xml   | 2 +-
 runners/spark/pom.xml | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a20e0b64/runners/flink/pom.xml
--
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index a340107..31713cd 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -24,7 +24,7 @@
 
   
 org.apache.beam
-runners
+runners-parent
 1.6.0-SNAPSHOT
   
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a20e0b64/runners/pom.xml
--
diff --git a/runners/pom.xml b/runners/pom.xml
index fbd6c41..b2e9eb1 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -29,7 +29,7 @@
   
 
   org.apache.beam
-  runners
+  runners-parent
   1.6.0-SNAPSHOT
 
   pom

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a20e0b64/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 6dcc95d..9d653a0 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -16,7 +16,7 @@ License.
 
 
 org.apache.beam
-runners
+runners-parent
 1.6.0-SNAPSHOT
 
 



[07/10] incubator-beam git commit: [flink] add test case for Runner registration

2016-03-23 Thread mxm
[flink] add test case for Runner registration


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cb5d6c2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cb5d6c2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cb5d6c2b

Branch: refs/heads/master
Commit: cb5d6c2bf577c0889adc28c06475b5ec22f027da
Parents: e94e7d6
Author: Maximilian Michels 
Authored: Mon Mar 21 11:36:35 2016 +0100
Committer: Maximilian Michels 
Committed: Wed Mar 23 19:27:51 2016 +0100

--
 .../runners/flink/FlinkRunnerRegistrarTest.java | 48 
 1 file changed, 48 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cb5d6c2b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
--
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
new file mode 100644
index 000..45ca830
--- /dev/null
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the proper registration of the Flink runner.
+ */
+public class FlinkRunnerRegistrarTest {
+
+  @Test
+  public void testFullName() {
+String[] args =
+new String[] {String.format("--runner=%s", 
FlinkPipelineRunner.class.getName())};
+PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+assertEquals(opts.getRunner(), FlinkPipelineRunner.class);
+  }
+
+  @Test
+  public void testClassName() {
+String[] args =
+new String[] {String.format("--runner=%s", 
FlinkPipelineRunner.class.getSimpleName())};
+PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+assertEquals(opts.getRunner(), FlinkPipelineRunner.class);
+  }
+
+}



[2/3] incubator-beam git commit: [flink] Add FlinkRunnerRegistrar

2016-03-21 Thread mxm
[flink] Add FlinkRunnerRegistrar

Expose Flink runner and options via AuteService. AuteService will
at compile time populate META-INF/services so that Dataflow sdk
can seamlessly pick up FlinkRunner.

This closes #40.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/086a35e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/086a35e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/086a35e9

Branch: refs/heads/master
Commit: 086a35e9d0d63d5ec15f2520146923066995492a
Parents: fcc6f3c
Author: Rafal Wojdyla 
Authored: Fri Mar 11 13:39:55 2016 -0500
Committer: Maximilian Michels 
Committed: Mon Mar 21 11:38:30 2016 +0100

--
 runners/flink/runner/pom.xml| 12 +++--
 .../runners/flink/FlinkRunnerRegistrar.java | 56 
 2 files changed, 63 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/086a35e9/runners/flink/runner/pom.xml
--
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 212b973..ff4b368 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -84,11 +84,6 @@
 
   
 
-
-  com.google.auto.service
-  auto-service
-  1.0-rc2
-
 
 
   com.google.cloud.dataflow
@@ -121,6 +116,13 @@
   1.9.5
   test
 
+
+
+  com.google.auto.service
+  auto-service
+  1.0-rc2
+  true
+
   
 
   

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/086a35e9/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
new file mode 100644
index 000..3e30ab9
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar;
+import com.google.common.collect.ImmutableList;
+
+
+/**
+ * AuteService registrar - will register FlinkRunner and FlinkOptions
+ * as possible pipeline runner services.
+ *
+ * It ends up in META-INF/services and gets picked up by Dataflow.
+ *
+ */
+public class FlinkRunnerRegistrar {
+  private FlinkRunnerRegistrar() { }
+
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class Runner implements PipelineRunnerRegistrar {
+@Override
+public Iterable> getPipelineRunners() {
+  return ImmutableList.>of(FlinkPipelineRunner.class);
+}
+  }
+
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class Options implements PipelineOptionsRegistrar {
+@Override
+public Iterable getPipelineOptions() {
+  return ImmutableList.of(FlinkPipelineOptions.class);
+}
+  }
+}



[1/3] incubator-beam git commit: [flink] add test case for Runner registration

2016-03-21 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master fcc6f3cfd -> c984f3ae2


[flink] add test case for Runner registration


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ba7b7a04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ba7b7a04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ba7b7a04

Branch: refs/heads/master
Commit: ba7b7a0447417b8c08c0748a5d60b598da7b8e4e
Parents: 086a35e
Author: Maximilian Michels 
Authored: Mon Mar 21 11:36:35 2016 +0100
Committer: Maximilian Michels 
Committed: Mon Mar 21 11:38:30 2016 +0100

--
 .../runners/flink/FlinkRunnerRegistrarTest.java | 48 
 1 file changed, 48 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ba7b7a04/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
--
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
new file mode 100644
index 000..45ca830
--- /dev/null
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the proper registration of the Flink runner.
+ */
+public class FlinkRunnerRegistrarTest {
+
+  @Test
+  public void testFullName() {
+String[] args =
+new String[] {String.format("--runner=%s", 
FlinkPipelineRunner.class.getName())};
+PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+assertEquals(opts.getRunner(), FlinkPipelineRunner.class);
+  }
+
+  @Test
+  public void testClassName() {
+String[] args =
+new String[] {String.format("--runner=%s", 
FlinkPipelineRunner.class.getSimpleName())};
+PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+assertEquals(opts.getRunner(), FlinkPipelineRunner.class);
+  }
+
+}



[1/2] incubator-beam git commit: [BEAM-116] change runners artifactId to runners-parent

2016-03-20 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master f7aaee2ea -> fcc6f3cfd


[BEAM-116] change runners artifactId to runners-parent


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/447c8af2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/447c8af2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/447c8af2

Branch: refs/heads/master
Commit: 447c8af23b3fa727bba5aa97093699745d6e6f5b
Parents: 5b5c0e2
Author: Maximilian Michels 
Authored: Wed Mar 16 12:03:07 2016 +0100
Committer: Maximilian Michels 
Committed: Fri Mar 18 12:05:07 2016 +0100

--
 runners/flink/pom.xml | 2 +-
 runners/pom.xml   | 2 +-
 runners/spark/pom.xml | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/447c8af2/runners/flink/pom.xml
--
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index a340107..31713cd 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -24,7 +24,7 @@
 
   
 org.apache.beam
-runners
+runners-parent
 1.6.0-SNAPSHOT
   
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/447c8af2/runners/pom.xml
--
diff --git a/runners/pom.xml b/runners/pom.xml
index fbd6c41..b2e9eb1 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -29,7 +29,7 @@
   
 
   org.apache.beam
-  runners
+  runners-parent
   1.6.0-SNAPSHOT
 
   pom

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/447c8af2/runners/spark/pom.xml
--
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 6dcc95d..9d653a0 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -16,7 +16,7 @@ License.
 
 
 org.apache.beam
-runners
+runners-parent
 1.6.0-SNAPSHOT
 
 



[2/2] incubator-beam git commit: [BEAM-116] This closes #59

2016-03-20 Thread mxm
[BEAM-116] This closes #59


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fcc6f3cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fcc6f3cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fcc6f3cf

Branch: refs/heads/master
Commit: fcc6f3cfd111ab222063e414579c966916299060
Parents: f7aaee2 447c8af
Author: Maximilian Michels 
Authored: Sun Mar 20 19:34:22 2016 +0100
Committer: Maximilian Michels 
Committed: Sun Mar 20 19:48:25 2016 +0100

--
 runners/flink/pom.xml | 2 +-
 runners/pom.xml   | 2 +-
 runners/spark/pom.xml | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
--




[GitHub] incubator-beam pull request: [BEAM-116] change runners artifactId ...

2016-03-19 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/incubator-beam/pull/59

[BEAM-116] change runners artifactId to runners-parent



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/incubator-beam BEAM-116

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/59.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #59


commit 447c8af23b3fa727bba5aa97093699745d6e6f5b
Author: Maximilian Michels <m...@apache.org>
Date:   2016-03-16T11:03:07Z

[BEAM-116] change runners artifactId to runners-parent




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


incubator-beam git commit: [flink] improvements to the Kafka Example

2016-03-19 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master 0f137169e -> ef1e32dee


[flink] improvements to the Kafka Example

- use timestamp extractor after ingestion
- fix coder runtime exception
- correct logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef1e32de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef1e32de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef1e32de

Branch: refs/heads/master
Commit: ef1e32deefb9886584556c7125e87b2873c63ebf
Parents: 0f13716
Author: Maximilian Michels 
Authored: Thu Mar 17 14:49:09 2016 +0100
Committer: Maximilian Michels 
Committed: Thu Mar 17 14:53:41 2016 +0100

--
 .../examples/streaming/KafkaWindowedWordCountExample.java | 2 +-
 .../flink/translation/FlinkStreamingTransformTranslators.java | 3 ++-
 .../wrappers/streaming/io/UnboundedFlinkSource.java   | 7 +++
 runners/flink/runner/src/main/resources/log4j.properties  | 2 +-
 4 files changed, 7 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
--
diff --git 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 3942d0d..8fca1d4 100644
--- 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -103,7 +103,7 @@ public class KafkaWindowedWordCountExample {
   public static void main(String[] args) {
 PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
 KafkaStreamingWordCountOptions options = 
PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
-options.setJobName("KafkaExample");
+options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() 
+ " seconds");
 options.setStreaming(true);
 options.setCheckpointingInterval(1000L);
 options.setNumberOfExecutionRetries(5);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index bdefeaf..2b9b0ee 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -44,6 +44,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.datastream.*;
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -179,7 +180,7 @@ public class FlinkStreamingTransformTranslators {
   public void flatMap(String s, Collector 
collector) throws Exception {
 collector.collect(WindowedValue.of(s, Instant.now(), 
GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
   }
-});
+}).assignTimestampsAndWatermarks(new IngestionTimeExtractor());
   } else {
 source = context.getExecutionEnvironment()
 .addSource(new 
UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index 05a8956..82984cb 100644
--- 

incubator-beam git commit: [flink] fix UnboundedFlinkSource wrapper

2016-03-19 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master a9c46057e -> 0f137169e


[flink] fix UnboundedFlinkSource wrapper

- remove unnecessary PipelineOptions cache
- use the correct interface types
- improve Kafka example


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f137169
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f137169
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f137169

Branch: refs/heads/master
Commit: 0f137169e4c2cd8d3e5a86c91bc2f401d276e8ed
Parents: a9c4605
Author: Maximilian Michels 
Authored: Thu Mar 17 12:26:03 2016 +0100
Committer: Maximilian Michels 
Committed: Thu Mar 17 12:27:37 2016 +0100

--
 .../KafkaWindowedWordCountExample.java  |  7 ++--
 .../streaming/io/UnboundedFlinkSource.java  | 37 ++--
 2 files changed, 22 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f137169/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
--
diff --git 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 55cdc22..3942d0d 100644
--- 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -22,7 +22,6 @@ import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
 import com.google.cloud.dataflow.sdk.options.Default;
 import com.google.cloud.dataflow.sdk.options.Description;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
@@ -30,7 +29,7 @@ import com.google.cloud.dataflow.sdk.transforms.*;
 import com.google.cloud.dataflow.sdk.transforms.windowing.*;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.joda.time.Duration;
 
@@ -121,12 +120,12 @@ public class KafkaWindowedWordCountExample {
 
 // this is the Flink consumer that reads the input to
 // the program from a kafka topic.
-FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>(
+FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(
 options.getKafkaTopic(),
 new SimpleStringSchema(), p);
 
 PCollection words = pipeline
-.apply(Read.from(new UnboundedFlinkSource(options, 
kafkaConsumer)).named("StreamingWordCount"))
+
.apply(Read.named("StreamingWordCount").from(UnboundedFlinkSource.of(kafkaConsumer)))
 .apply(ParDo.of(new ExtractWordsFn()))
 
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize(
 
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f137169/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index 2857efd..05a8956 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -22,7 +22,9 @@ import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.common.base.Preconditions;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import 

incubator-beam git commit: [BEAM-126] remove strict job name check

2016-03-18 Thread mxm
Repository: incubator-beam
Updated Branches:
  refs/heads/master 5b5c0e28f -> 81d5ff5a5


[BEAM-126] remove strict job name check


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81d5ff5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81d5ff5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81d5ff5a

Branch: refs/heads/master
Commit: 81d5ff5a561ebcf323caea5bdc4363353e5e60dd
Parents: 5b5c0e2
Author: Maximilian Michels 
Authored: Fri Mar 18 15:46:36 2016 +0100
Committer: Maximilian Michels 
Committed: Fri Mar 18 16:01:48 2016 +0100

--
 .../runners/flink/FlinkPipelineExecutionEnvironment.java | 4 ++--
 .../org/apache/beam/runners/flink/FlinkPipelineRunner.java   | 8 
 2 files changed, 2 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81d5ff5a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 8825ed3..6f93478 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -141,7 +141,7 @@ public class FlinkPipelineExecutionEnvironment {
   if (this.flinkPipelineTranslator == null) {
 throw new RuntimeException("FlinkPipelineTranslator not initialized.");
   }
-  return this.flinkStreamEnv.execute();
+  return this.flinkStreamEnv.execute(options.getJobName());
 } else {
   if (this.flinkBatchEnv == null) {
 throw new RuntimeException("FlinkPipelineExecutionEnvironment not 
initialized.");
@@ -149,7 +149,7 @@ public class FlinkPipelineExecutionEnvironment {
   if (this.flinkPipelineTranslator == null) {
 throw new RuntimeException("FlinkPipelineTranslator not initialized.");
   }
-  return this.flinkBatchEnv.execute();
+  return this.flinkBatchEnv.execute(options.getJobName());
 }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81d5ff5a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index fe773d9..4f53e35 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -87,14 +87,6 @@ public class FlinkPipelineRunner extends 
PipelineRunner {
   LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
 }
 
-// Verify jobName according to service requirements.
-String jobName = flinkOptions.getJobName().toLowerCase();
-Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), 
"JobName invalid; " +
-"the name must consist of only the characters " + "[-a-z0-9], starting 
with a letter " +
-"and ending with a letter " + "or number");
-Preconditions.checkArgument(jobName.length() <= 40,
-"JobName too long; must be no more than 40 characters in length");
-
 // Set Flink Master to [auto] if no option was specified.
 if (flinkOptions.getFlinkMaster() == null) {
   flinkOptions.setFlinkMaster("[auto]");



[09/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout

2016-03-15 Thread mxm
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
new file mode 100644
index 000..10c8bbf
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.common.base.Preconditions;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.base.Throwables;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+
+import java.util.Collection;
+
+/**
+ * An abstract class that encapsulates the common code of the the {@link 
com.google.cloud.dataflow.sdk.transforms.ParDo.Bound}
+ * and {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} 
wrappers. See the {@link FlinkParDoBoundWrapper} and
+ * {@link FlinkParDoBoundMultiWrapper} for the actual wrappers of the 
aforementioned transformations.
+ * */
+public abstract class FlinkAbstractParDoWrapper extends 
RichFlatMapFunction {
+
+  private final DoFn doFn;
+  private final WindowingStrategy windowingStrategy;
+  private transient PipelineOptions options;
+
+  private DoFnProcessContext context;
+
+  public FlinkAbstractParDoWrapper(PipelineOptions options, 
WindowingStrategy windowingStrategy, DoFn doFn) {
+Preconditions.checkNotNull(options);
+Preconditions.checkNotNull(windowingStrategy);
+Preconditions.checkNotNull(doFn);
+
+this.doFn = doFn;
+this.options = options;
+this.windowingStrategy = windowingStrategy;
+  }
+
+  private void initContext(DoFn function, 
Collector outCollector) {
+if (this.context == null) {
+  this.context = new DoFnProcessContext(function, outCollector);
+}
+  }
+
+  @Override
+  public void flatMap(WindowedValue value, Collector 
out) throws Exception {
+this.initContext(doFn, out);
+
+// for each window the element belongs to, create a new copy here.
+Collection windows = value.getWindows();
+if (windows.size() <= 1) {
+  processElement(value);
+} else {
+  for (BoundedWindow window : windows) {
+processElement(WindowedValue.of(
+value.getValue(), value.getTimestamp(), window, value.getPane()));
+  }
+}
+  }
+
+  private void processElement(WindowedValue value) throws Exception {
+this.context.setElement(value);
+this.doFn.startBundle(context);
+doFn.processElement(context);
+this.doFn.finishBundle(context);
+  }
+
+  private class DoFnProcessContext extends DoFn.ProcessContext {
+
+private final DoFn fn;
+
+protected final Collector collector;
+
+private 

[03/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout

2016-03-15 Thread mxm
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
--
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
deleted file mode 100644
index cd5cd40..000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.Source;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a
- * Dataflow {@link com.google.cloud.dataflow.sdk.io.Source}.
- */
-public class SourceInputFormat implements InputFormat {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SourceInputFormat.class);
-
-  private final BoundedSource initialSource;
-  private transient PipelineOptions options;
-
-  private BoundedSource.BoundedReader reader = null;
-  private boolean reachedEnd = true;
-
-  public SourceInputFormat(BoundedSource initialSource, PipelineOptions 
options) {
-this.initialSource = initialSource;
-this.options = options;
-  }
-
-  private void writeObject(ObjectOutputStream out)
-  throws IOException, ClassNotFoundException {
-out.defaultWriteObject();
-ObjectMapper mapper = new ObjectMapper();
-mapper.writeValue(out, options);
-  }
-
-  private void readObject(ObjectInputStream in)
-  throws IOException, ClassNotFoundException {
-in.defaultReadObject();
-ObjectMapper mapper = new ObjectMapper();
-options = mapper.readValue(in, PipelineOptions.class);
-  }
-
-  @Override
-  public void configure(Configuration configuration) {}
-
-  @Override
-  public void open(SourceInputSplit sourceInputSplit) throws IOException {
-reader = ((BoundedSource) 
sourceInputSplit.getSource()).createReader(options);
-reachedEnd = false;
-  }
-
-  @Override
-  public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws 
IOException {
-try {
-  final long estimatedSize = initialSource.getEstimatedSizeBytes(options);
-
-  return new BaseStatistics() {
-@Override
-public long getTotalInputSize() {
-  return estimatedSize;
-
-}
-
-@Override
-public long getNumberOfRecords() {
-  return BaseStatistics.NUM_RECORDS_UNKNOWN;
-}
-
-@Override
-public float getAverageRecordWidth() {
-  return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;
-}
-  };
-} catch (Exception e) {
-  LOG.warn("Could not read Source statistics: {}", e);
-}
-
-return null;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public SourceInputSplit[] createInputSplits(int numSplits) throws 
IOException {
-long desiredSizeBytes;
-try {
-  desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / 
numSplits;
-  List> shards = 
initialSource.splitIntoBundles(desiredSizeBytes,
-  options);
-  List splits = new ArrayList<>();
-  int splitCount = 0;
-  for (Source shard: shards) {
-splits.add(new SourceInputSplit<>(shard, splitCount++));
- 

[07/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout

2016-03-15 Thread mxm
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
--
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
new file mode 100644
index 000..e73c456
--- /dev/null
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.base.Joiner;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+
+public class WordCountJoin2ITCase extends JavaProgramTestBase {
+
+  static final String[] WORDS_1 = new String[] {
+  "hi there", "hi", "hi sue bob",
+  "hi sue", "", "bob hi"};
+
+  static final String[] WORDS_2 = new String[] {
+  "hi tim", "beauty", "hooray sue bob",
+  "hi there", "", "please say hi"};
+
+  static final String[] RESULTS = new String[] {
+  "beauty -> Tag1: Tag2: 1",
+  "bob -> Tag1: 2 Tag2: 1",
+  "hi -> Tag1: 5 Tag2: 3",
+  "hooray -> Tag1: Tag2: 1",
+  "please -> Tag1: Tag2: 1",
+  "say -> Tag1: Tag2: 1",
+  "sue -> Tag1: 2 Tag2: 1",
+  "there -> Tag1: 1 Tag2: 1",
+  "tim -> Tag1: Tag2: 1"
+  };
+
+  static final TupleTag tag1 = new TupleTag<>("Tag1");
+  static final TupleTag tag2 = new TupleTag<>("Tag2");
+
+  protected String resultPath;
+
+  @Override
+  protected void preSubmit() throws Exception {
+resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+Pipeline p = FlinkTestPipeline.createForBatch();
+
+/* Create two PCollections and join them */
+PCollection> occurences1 = p.apply(Create.of(WORDS_1))
+.apply(ParDo.of(new ExtractWordsFn()))
+.apply(Count.perElement());
+
+PCollection> occurences2 = p.apply(Create.of(WORDS_2))
+.apply(ParDo.of(new ExtractWordsFn()))
+.apply(Count.perElement());
+
+/* CoGroup the two collections */
+PCollection> mergedOccurences = 
KeyedPCollectionTuple
+.of(tag1, occurences1)
+.and(tag2, occurences2)
+.apply(CoGroupByKey.create());
+
+/* Format output */
+mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
+.apply(TextIO.Write.named("test").to(resultPath));
+
+p.run();
+  }
+
+
+  static class ExtractWordsFn extends DoFn {
+
+@Override
+public void startBundle(Context c) {
+}
+
+@Override
+public void processElement(ProcessContext c) {
+  // Split the line into words.
+  String[] words = c.element().split("[^a-zA-Z']+");
+
+  // Output each word encountered into the output PCollection.
+  for (String word : words) {
+if (!word.isEmpty()) {
+  c.output(word);
+}
+  }
+}
+  }
+
+  static class FormatCountsFn extends DoFn, String> {
+@Override
+public void processElement(ProcessContext c) {
+  CoGbkResult value = c.element().getValue();
+  String key = c.element().getKey();
+  String 

[10/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout

2016-03-15 Thread mxm
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
new file mode 100644
index 000..ca667ee
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingInternals;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.collect.ImmutableList;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Encapsulates a {@link com.google.cloud.dataflow.sdk.transforms.DoFn} that 
uses side outputs
+ * inside a Flink {@link 
org.apache.flink.api.common.functions.RichMapPartitionFunction}.
+ *
+ * We get a mapping from {@link com.google.cloud.dataflow.sdk.values.TupleTag} 
to output index
+ * and must tag all outputs with the output number. Afterwards a filter will 
filter out
+ * those elements that are not to be in a specific output.
+ */
+public class FlinkMultiOutputDoFnFunction extends 
RichMapPartitionFunction {
+
+  private final DoFn doFn;
+  private transient PipelineOptions options;
+  private final Map outputMap;
+
+  public FlinkMultiOutputDoFnFunction(DoFn doFn, PipelineOptions 
options, Map outputMap) {
+this.doFn = doFn;
+this.options = options;
+this.outputMap = outputMap;
+  }
+
+  private void writeObject(ObjectOutputStream out)
+  throws IOException, ClassNotFoundException {
+out.defaultWriteObject();
+ObjectMapper mapper = new ObjectMapper();
+mapper.writeValue(out, options);
+  }
+
+  private void readObject(ObjectInputStream in)
+  throws IOException, ClassNotFoundException {
+in.defaultReadObject();
+ObjectMapper mapper = new ObjectMapper();
+options = mapper.readValue(in, PipelineOptions.class);
+
+  }
+
+  @Override
+  public void mapPartition(Iterable values, Collector out) 
throws Exception {
+ProcessContext context = new ProcessContext(doFn, out);
+this.doFn.startBundle(context);
+for (IN value : values) {
+  context.inValue = value;
+  doFn.processElement(context);
+}
+this.doFn.finishBundle(context);
+  }
+
+  private class ProcessContext extends DoFn.ProcessContext {
+
+IN inValue;
+Collector outCollector;
+
+public ProcessContext(DoFn fn, Collector 
outCollector) {
+  fn.super();
+  this.outCollector = outCollector;
+}
+
+@Override
+public IN element() {
+  return this.inValue;
+}
+
+@Override
+public Instant 

[05/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout

2016-03-15 Thread mxm
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
--
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
deleted file mode 100644
index 71e3b54..000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.io;
-
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-/**
- * Transform for printing the contents of a {@link 
com.google.cloud.dataflow.sdk.values.PCollection}.
- * to standard output.
- *
- * This is Flink-specific and will only work when executed using the
- * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}.
- */
-public class ConsoleIO {
-
-  /**
-   * A PTransform that writes a PCollection to a standard output.
-   */
-  public static class Write {
-
-/**
- * Returns a ConsoleIO.Write PTransform with a default step name.
- */
-public static Bound create() {
-  return new Bound();
-}
-
-/**
- * Returns a ConsoleIO.Write PTransform with the given step name.
- */
-public static Bound named(String name) {
-  return new Bound().named(name);
-}
-
-/**
- * A PTransform that writes a bounded PCollection to standard output.
- */
-public static class Bound extends PTransform {
-  private static final long serialVersionUID = 0;
-
-  Bound() {
-super("ConsoleIO.Write");
-  }
-
-  Bound(String name) {
-super(name);
-  }
-
-  /**
-   * Returns a new ConsoleIO.Write PTransform that's like this one but 
with the given
-   * step
-   * name.  Does not modify this object.
-   */
-  public Bound named(String name) {
-return new Bound(name);
-  }
-
-  @Override
-  public PDone apply(PCollection input) {
-return PDone.in(input.getPipeline());
-  }
-}
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
--
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
deleted file mode 100644
index 28a10b7..000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import 

[12/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout

2016-03-15 Thread mxm
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/pom.xml
--
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index c8c5d84..31713cd 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -1,261 +1,167 @@
 
 
 http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
-xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
-
-4.0.0
-
-
-org.apache.beam
-runners
-1.6.0-SNAPSHOT
-
-
-flink-runner
-0.3-SNAPSHOT
-
-Flink Beam Runner
-jar
-
-2015
-
-
-
-The Apache Software License, Version 2.0
-http://www.apache.org/licenses/LICENSE-2.0.txt
-repo
-
-
-
-
-UTF-8
-
UTF-8
-1.0.0
-1.6.0-SNAPSHOT
-2.10
-
-org.apache.beam.runners.flink.examples.WordCount
-kinglear.txt
-wordcounts.txt
-1
-
-
-
-
-apache.snapshots
-Apache Development Snapshot Repository
-
https://repository.apache.org/content/repositories/snapshots/
-
-false
-
-
-true
-
-
-
-
-
-
-org.apache.flink
-flink-core
-${flink.version}
-
-
-org.apache.flink
-
flink-streaming-java_${scala.major.version}
-${flink.version}
-
-
-org.apache.flink
-
flink-streaming-java_${scala.major.version}
-${flink.version}
-test
-test-jar
-
-
-org.apache.flink
-flink-java
-${flink.version}
-
-
-org.apache.flink
-flink-clients_${scala.major.version}
-${flink.version}
-
-
-org.apache.flink
-flink-test-utils_${scala.major.version}
-${flink.version}
-test
-
-
-org.apache.flink
-
flink-connector-kafka-0.8_${scala.major.version}
-${flink.version}
-
-
-org.apache.flink
-flink-avro_${scala.major.version}
-${flink.version}
-
-
-com.google.cloud.dataflow
-google-cloud-dataflow-java-sdk-all
-${beam.version}
-
-
-org.slf4j
-slf4j-jdk14
-
-
-
-
-org.mockito
-mockito-all
-1.9.5
-test
-
-
-
-
-
-
-
-
-org.apache.maven.plugins
-maven-jar-plugin
-2.6
-
-
-
-
true
-
true
-
-
-
-
-
-
-
-
-org.apache.maven.plugins
-maven-compiler-plugin
-3.1
-
-1.7
-1.7
-
-
-
-
-
-maven-failsafe-plugin
-2.17
-
-
-
-integration-test
-verify
-
-
-
-
-1
--Dlog4j.configuration=log4j-test.properties  
-XX:-UseGCOverheadLimit
-
-
-
-
-
-org.apache.maven.plugins
-maven-surefire-plugin
-2.17
-
--Dlog4j.configuration=log4j-test.properties  
-XX:-UseGCOverheadLimit
-
-
-
-
-
-org.apache.maven.plugins
-maven-eclipse-plugin
-2.8
-
-
-
org.eclipse.jdt.launching.JRE_CONTAINER
-
-true
-true
-
-
-
-
-
-org.apache.maven.plugins
-maven-enforcer-plugin
-1.3.1
-
-
-enforce-maven
-
-enforce
-
-
-
-
-[1.7,)
-
-  

  1   2   >