[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=98037=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98037
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 03/May/18 21:01
Start Date: 03/May/18 21:01
Worklog Time Spent: 10m 
  Work Description: jkff closed pull request #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SyntheticNodes.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SyntheticComponents.java
similarity index 87%
rename from 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SyntheticNodes.java
rename to 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SyntheticComponents.java
index fc2cb3dc562..f7adf6de5e1 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SyntheticNodes.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SyntheticComponents.java
@@ -16,15 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.beam.runners.core.construction.graph;
+package org.apache.beam.runners.core.construction;
 
 import java.util.function.Predicate;
 
 /**
- * A utility class to interact with synthetic {@link PipelineNode Pipeline 
Nodes}.
+ * A utility class to interact with synthetic pipeline components.
  */
-class SyntheticNodes {
-  private SyntheticNodes() {}
+public class SyntheticComponents {
+  private SyntheticComponents() {}
 
   /**
* Generate an ID which does not collide with any existing ID, as determined 
by the input
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
index ddc03355a90..df3aa5fdfb7 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
@@ -30,6 +30,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.SyntheticComponents;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
 /** A {@link Pipeline} which has been separated into collections of executable 
components. */
@@ -106,7 +107,7 @@ static FusedPipeline of(
   Set usedNames =
   Sets.union(topLevelTransforms.keySet(), 
getComponents().getTransformsMap().keySet());
   topLevelTransforms.put(
-  SyntheticNodes.uniqueId(baseName, usedNames::contains), 
stage.toPTransform());
+  SyntheticComponents.uniqueId(baseName, usedNames::contains), 
stage.toPTransform());
 }
 return topLevelTransforms;
   }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
index 4419787ede1..45c4a27cdcf 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
@@ -39,6 +39,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SyntheticComponents;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
@@ -131,7 +132,7 @@ static DeduplicationResult ensureSingleProducer(
   PTransform flattenPartialPCollections =
   createFlattenOfPartials(partialFlattenTargets.getKey(), 
partialFlattenTargets.getValue());
   String flattenId =
-  SyntheticNodes.uniqueId("unzipped_flatten", 
unzippedComponents::containsTransforms);
+  

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97736=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97736
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 03/May/18 00:42
Start Date: 03/May/18 00:42
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185677199
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java
 ##
 @@ -14,7 +14,7 @@
  * the License.
  */
 
-package org.apache.beam.runners.fnexecution.graph;
+package org.apache.beam.runners.fnexecution.wire;
 
 Review comment:
   @jkff I just looked at the past runs under 
https://builds.apache.org/view/A-D/view/Beam/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/.
 It looks like _every_ recent run has either been aborted or failed due to 
worker timeout issues.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97736)
Time Spent: 8.5h  (was: 8h 20m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97734=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97734
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 03/May/18 00:19
Start Date: 03/May/18 00:19
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185671288
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97733=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97733
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 03/May/18 00:19
Start Date: 03/May/18 00:19
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185654927
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97705=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97705
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 02/May/18 22:04
Start Date: 02/May/18 22:04
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185631769
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java
 ##
 @@ -14,7 +14,7 @@
  * the License.
  */
 
-package org.apache.beam.runners.fnexecution.graph;
+package org.apache.beam.runners.fnexecution.wire;
 
 Review comment:
   The worker shouldn't depend on this in the staged SDK - so this will be fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97705)
Time Spent: 7h 50m  (was: 7h 40m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97706=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97706
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 02/May/18 22:04
Start Date: 02/May/18 22:04
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185652571
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97704=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97704
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 02/May/18 22:04
Start Date: 02/May/18 22:04
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185632348
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97679=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97679
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 02/May/18 19:55
Start Date: 02/May/18 19:55
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#issuecomment-386100660
 
 
   Run Dataflow ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97679)
Time Spent: 7h 40m  (was: 7.5h)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97620=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97620
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 02/May/18 18:07
Start Date: 02/May/18 18:07
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#issuecomment-386069062
 
 
   Run Dataflow ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97620)
Time Spent: 7.5h  (was: 7h 20m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97576=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97576
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 02/May/18 16:44
Start Date: 02/May/18 16:44
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#issuecomment-386043265
 
 
   Run Dataflow ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97576)
Time Spent: 7h 20m  (was: 7h 10m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97291=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97291
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 23:24
Start Date: 01/May/18 23:24
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#issuecomment-385819389
 
 
   Run Dataflow ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97291)
Time Spent: 7h 10m  (was: 7h)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97290=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97290
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 23:23
Start Date: 01/May/18 23:23
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185359755
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
+public class FlinkExecutionEnvironments {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.api.java.ExecutionEnvironment} depending on the 
user-specified options.
+   */
+  public static ExecutionEnvironment 
createBatchExecutionEnvironment(FlinkPipelineOptions options) {
+
+LOG.info("Creating a Batch Execution Environment.");
+
+String masterUrl = options.getFlinkMaster();
+ExecutionEnvironment flinkBatchEnv;
+
+// depending on the master, create the right environment.
+if ("[local]".equals(masterUrl)) {
 
 Review comment:
   This is something we do in Beam. It's what was done pre-portability. I think 
we actually want to use a completely different code path in the portable 
runner. It's the responsibility of whoever spins up the JobService to give 
Flink the correct execution environment. In any case, this should not be based 
on user pipeline options but runner startup configuration options.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97290)
Time Spent: 7h  (was: 6h 50m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97264=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97264
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 22:06
Start Date: 01/May/18 22:06
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185336352
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97263=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97263
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 22:06
Start Date: 01/May/18 22:06
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185347003
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
+public class FlinkExecutionEnvironments {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.api.java.ExecutionEnvironment} depending on the 
user-specified options.
+   */
+  public static ExecutionEnvironment 
createBatchExecutionEnvironment(FlinkPipelineOptions options) {
+
+LOG.info("Creating a Batch Execution Environment.");
+
+String masterUrl = options.getFlinkMaster();
+ExecutionEnvironment flinkBatchEnv;
+
+// depending on the master, create the right environment.
+if ("[local]".equals(masterUrl)) {
+  flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+} else if ("[collection]".equals(masterUrl)) {
+  flinkBatchEnv = new CollectionEnvironment();
+} else if ("[auto]".equals(masterUrl)) {
+  flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+} else if (masterUrl.matches(".*:\\d*")) {
+  String[] parts = masterUrl.split(":");
+  List stagingFiles = options.getFilesToStage();
 
 Review comment:
   In the case of auto, the runner is either executing locally (i.e., all the 
jars are already available on the current classpath) or the job is submitted 
via `flink run`. In the `flink run` case, you pass staging files to the `flink` 
preparation script and that is done automatically.
   
   In any case, we will likely not want this in the portable runner. The 
runner/job service should not have its classpath polluted by user code. 
Instead, the files should only be staged via the DistributedCache. I might 
revert this change and just have the portable runner use its own 
`ExecutionEnvironment` construction semantics. I think that makes sense here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97263)
Time Spent: 5h 50m  (was: 5h 40m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97268=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97268
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 22:06
Start Date: 01/May/18 22:06
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185344604
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
 
 Review comment:
   Just a code move. I wish there were a way to indicate this to git.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97268)
Time Spent: 6.5h  (was: 6h 20m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97261=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97261
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 22:06
Start Date: 01/May/18 22:06
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185335028
 
 

 ##
 File path: runners/flink/build.gradle
 ##
 @@ -51,6 +51,7 @@ dependencies {
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow project(path: ":beam-runners-core-java", configuration: "shadow")
   shadow project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
+  shadow project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
 
 Review comment:
   I'm not sure what the status is of keeping the pom files updated. We don't 
appear to be running the maven precommits any longer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97261)
Time Spent: 5.5h  (was: 5h 20m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97270=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97270
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 22:06
Start Date: 01/May/18 22:06
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185342137
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97267=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97267
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 22:06
Start Date: 01/May/18 22:06
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185340333
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97266=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97266
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 22:06
Start Date: 01/May/18 22:06
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185346003
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
+public class FlinkExecutionEnvironments {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.api.java.ExecutionEnvironment} depending on the 
user-specified options.
+   */
+  public static ExecutionEnvironment 
createBatchExecutionEnvironment(FlinkPipelineOptions options) {
+
+LOG.info("Creating a Batch Execution Environment.");
+
+String masterUrl = options.getFlinkMaster();
+ExecutionEnvironment flinkBatchEnv;
+
+// depending on the master, create the right environment.
+if ("[local]".equals(masterUrl)) {
+  flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+} else if ("[collection]".equals(masterUrl)) {
 
 Review comment:
   What do you mean?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97266)
Time Spent: 6h 20m  (was: 6h 10m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97262=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97262
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 22:06
Start Date: 01/May/18 22:06
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185335332
 
 

 ##
 File path: runners/flink/build.gradle
 ##
 @@ -51,6 +51,7 @@ dependencies {
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow project(path: ":beam-runners-core-java", configuration: "shadow")
   shadow project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
+  shadow project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
 
 Review comment:
   I've gone ahead and added it for now anyway, but it would be good to know 
what's going on here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97262)
Time Spent: 5h 40m  (was: 5.5h)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97260=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97260
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 22:06
Start Date: 01/May/18 22:06
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185338177
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97269
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 22:06
Start Date: 01/May/18 22:06
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185344328
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97265=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97265
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 22:06
Start Date: 01/May/18 22:06
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185343381
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97225=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97225
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 20:58
Start Date: 01/May/18 20:58
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#issuecomment-385788230
 
 
   Run Dataflow ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97225)
Time Spent: 5h 20m  (was: 5h 10m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96920=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96920
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 01:27
Start Date: 01/May/18 01:27
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185151713
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
 
 Review comment:
   
   Is this file just a code move or is there more to it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96920)
Time Spent: 5h  (was: 4h 50m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96911=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96911
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 01:27
Start Date: 01/May/18 01:27
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185151698
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96912=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96912
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 01:27
Start Date: 01/May/18 01:27
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185151699
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96919=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96919
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 01:27
Start Date: 01/May/18 01:27
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185151710
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
+public class FlinkExecutionEnvironments {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.api.java.ExecutionEnvironment} depending on the 
user-specified options.
+   */
+  public static ExecutionEnvironment 
createBatchExecutionEnvironment(FlinkPipelineOptions options) {
+
+LOG.info("Creating a Batch Execution Environment.");
+
+String masterUrl = options.getFlinkMaster();
+ExecutionEnvironment flinkBatchEnv;
+
+// depending on the master, create the right environment.
+if ("[local]".equals(masterUrl)) {
+  flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+} else if ("[collection]".equals(masterUrl)) {
+  flinkBatchEnv = new CollectionEnvironment();
+} else if ("[auto]".equals(masterUrl)) {
+  flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+} else if (masterUrl.matches(".*:\\d*")) {
+  String[] parts = masterUrl.split(":");
+  List stagingFiles = options.getFilesToStage();
 
 Review comment:
   
   Is it guaranteed that in the "auto" case we don't need to stage anything?
   
   It would be good if we staged stuff anyway, to make sure the code paths are 
always exercised.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96919)
Time Spent: 4h 50m  (was: 4h 40m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96914=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96914
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 01:27
Start Date: 01/May/18 01:27
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185151701
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96917=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96917
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 01:27
Start Date: 01/May/18 01:27
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185151706
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java
 ##
 @@ -14,7 +14,7 @@
  * the License.
  */
 
-package org.apache.beam.runners.fnexecution.graph;
+package org.apache.beam.runners.fnexecution.wire;
 
 Review comment:
   
   I'm not sure you can just move this file, the Dataflow worker might depend 
on it - please confirm by running Dataflow ValidatesRunner tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96917)
Time Spent: 4.5h  (was: 4h 20m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96916=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96916
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 01:27
Start Date: 01/May/18 01:27
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185151703
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96915=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96915
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 01:27
Start Date: 01/May/18 01:27
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185151700
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96921=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96921
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 01:27
Start Date: 01/May/18 01:27
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185151712
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
+public class FlinkExecutionEnvironments {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.api.java.ExecutionEnvironment} depending on the 
user-specified options.
+   */
+  public static ExecutionEnvironment 
createBatchExecutionEnvironment(FlinkPipelineOptions options) {
+
+LOG.info("Creating a Batch Execution Environment.");
+
+String masterUrl = options.getFlinkMaster();
+ExecutionEnvironment flinkBatchEnv;
+
+// depending on the master, create the right environment.
+if ("[local]".equals(masterUrl)) {
 
 Review comment:
   
   Are these standard Flink URLs or something we introduce for Beam testing 
purposes?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96921)
Time Spent: 5h 10m  (was: 5h)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96913=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96913
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 01:27
Start Date: 01/May/18 01:27
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185151702
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96918=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96918
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 01:27
Start Date: 01/May/18 01:27
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185151709
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
+public class FlinkExecutionEnvironments {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.api.java.ExecutionEnvironment} depending on the 
user-specified options.
+   */
+  public static ExecutionEnvironment 
createBatchExecutionEnvironment(FlinkPipelineOptions options) {
+
+LOG.info("Creating a Batch Execution Environment.");
+
+String masterUrl = options.getFlinkMaster();
+ExecutionEnvironment flinkBatchEnv;
+
+// depending on the master, create the right environment.
+if ("[local]".equals(masterUrl)) {
+  flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+} else if ("[collection]".equals(masterUrl)) {
 
 Review comment:
   
   Does Beam make sense in this environment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96918)
Time Spent: 4h 40m  (was: 4.5h)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96896=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96896
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 01/May/18 00:50
Start Date: 01/May/18 00:50
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#issuecomment-385571641
 
 
   The translator isn't wired and `FlinkJobServerDriver` isn't in master yet, 
is there a separate JIRA for that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96896)
Time Spent: 3h 20m  (was: 3h 10m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96887=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96887
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 30/Apr/18 23:37
Start Date: 30/Apr/18 23:37
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185139455
 
 

 ##
 File path: runners/flink/build.gradle
 ##
 @@ -51,6 +51,7 @@ dependencies {
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow project(path: ":beam-runners-core-java", configuration: "shadow")
   shadow project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
+  shadow project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
 
 Review comment:
   Needs to be added to pom.xml also?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96887)
Time Spent: 3h 10m  (was: 3h)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96886=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96886
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 30/Apr/18 23:32
Start Date: 30/Apr/18 23:32
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185138668
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96854=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96854
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 30/Apr/18 20:59
Start Date: 30/Apr/18 20:59
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#issuecomment-385526902
 
 
   All precommits appear to be passing now. I'm going to go ahead and clean up 
commit history.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96854)
Time Spent: 2h 50m  (was: 2h 40m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96846=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96846
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 30/Apr/18 20:37
Start Date: 30/Apr/18 20:37
Worklog Time Spent: 10m 
  Work Description: angoenka commented on issue #5226: [BEAM-3972] 
Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#issuecomment-385521212
 
 
   LGTM. Will wait for builds to finish before approving.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96846)
Time Spent: 2h 40m  (was: 2.5h)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96836=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96836
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 30/Apr/18 20:18
Start Date: 30/Apr/18 20:18
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#issuecomment-385516112
 
 
   Rebased. PTAL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96836)
Time Spent: 2.5h  (was: 2h 20m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96834=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96834
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 30/Apr/18 20:07
Start Date: 30/Apr/18 20:07
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185095273
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/Componentses.java
 ##
 @@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution;
+
+import java.util.function.Predicate;
+
+/** Utilities for {@link 
org.apache.beam.model.pipeline.v1.RunnerApi.Components}. */
+public class Componentses {
+  // TODO: Remove this class and replace with SyntheticNodes.uniqueId once
+  // https://github.com/apache/beam/pull/4977 lands.
 
 Review comment:
   I've pulled in SyntheticNodes after rebasing. However, that class is 
package-private and is not quite named correctly for what we want. I've changed 
it to org.apache.beam.runners.core.construction.SyntheticComponents.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96834)
Time Spent: 2h 20m  (was: 2h 10m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96790=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96790
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 30/Apr/18 17:55
Start Date: 30/Apr/18 17:55
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185055889
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
+public class FlinkExecutionEnvironments {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.api.java.ExecutionEnvironment} depending on the 
user-specified options.
+   */
+  public static ExecutionEnvironment 
createBatchExecutionEnvironment(FlinkPipelineOptions options) {
 
 Review comment:
   That's a good idea. Unfortunately, we can't realistically mock or substitute 
our own `ExecutionEnvironment` because it is expected to actually perform 
pipeline execution for us. The default environment (as constructed here) 
actually runs a local Flink "mini cluster" when used in tests. We already use 
this for our Flink end-to-end tests, including the `ValidatesRunner` tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96790)
Time Spent: 2h  (was: 1h 50m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96791=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96791
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 30/Apr/18 17:55
Start Date: 30/Apr/18 17:55
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185059137
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96792=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96792
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 30/Apr/18 17:55
Start Date: 30/Apr/18 17:55
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r185059970
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95793=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95793
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 27/Apr/18 00:00
Start Date: 27/Apr/18 00:00
Worklog Time Spent: 10m 
  Work Description: jkff commented on issue #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#issuecomment-384823663
 
 
   Looks like this also needs a rebase.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95793)
Time Spent: 1h 50m  (was: 1h 40m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95314=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95314
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 26/Apr/18 00:47
Start Date: 26/Apr/18 00:47
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r184205121
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95315=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95315
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 26/Apr/18 00:47
Start Date: 26/Apr/18 00:47
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r184221413
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 ##
 @@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
+public class FlinkExecutionEnvironments {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.api.java.ExecutionEnvironment} depending on the 
user-specified options.
+   */
+  public static ExecutionEnvironment 
createBatchExecutionEnvironment(FlinkPipelineOptions options) {
 
 Review comment:
   Instead of using static methods, shall we use factory to create the 
environments. 
   Static methods are hard to swap and might create challenge while testing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95315)
Time Spent: 1h 40m  (was: 1.5h)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95242=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95242
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 25/Apr/18 20:45
Start Date: 25/Apr/18 20:45
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r184195682
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95243=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95243
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 25/Apr/18 20:45
Start Date: 25/Apr/18 20:45
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r184196551
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/** Interface for portable Flink translators. */
 
 Review comment:
   Expanded.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95243)
Time Spent: 1.5h  (was: 1h 20m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95241=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95241
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 25/Apr/18 20:45
Start Date: 25/Apr/18 20:45
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r184195037
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95240=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95240
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 25/Apr/18 20:45
Start Date: 25/Apr/18 20:45
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r184177308
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95196=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95196
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 25/Apr/18 19:15
Start Date: 25/Apr/18 19:15
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r184169631
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95193=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95193
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 25/Apr/18 19:15
Start Date: 25/Apr/18 19:15
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r184165603
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95194=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95194
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 25/Apr/18 19:15
Start Date: 25/Apr/18 19:15
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r184167002
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 

[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95195=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95195
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 25/Apr/18 19:15
Start Date: 25/Apr/18 19:15
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5226: 
[BEAM-3972] Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#discussion_r184173167
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
 ##
 @@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/** Interface for portable Flink translators. */
 
 Review comment:
   Comment is not very helpful.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95195)
Time Spent: 50m  (was: 40m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95162=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95162
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 25/Apr/18 18:19
Start Date: 25/Apr/18 18:19
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#issuecomment-384385178
 
 
   R: @jkff @angoenka @tgroh 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95162)
Time Spent: 0.5h  (was: 20m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95161=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95161
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 25/Apr/18 18:19
Start Date: 25/Apr/18 18:19
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226#issuecomment-384385005
 
 
   The `Componentses` class should go away in favor of `SyntheticNodes` once 
#4977 lands.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95161)
Time Spent: 20m  (was: 10m)

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95159=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95159
 ]

ASF GitHub Bot logged work on BEAM-3972:


Author: ASF GitHub Bot
Created on: 25/Apr/18 18:18
Start Date: 25/Apr/18 18:18
Worklog Time Spent: 10m 
  Work Description: bsidhom opened a new pull request #5226: [BEAM-3972] 
Translate portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226
 
 
   Note that Flink does not provide a way to make assertions on the output 
graph shape and we can't yet execute ExecutableStages. The plan is to test 
translation end-to-end by ValidateRunner tests once the missing pieces are in.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
- [ ] Write a pull request description that is detailed enough to 
understand:
  - [ ] What the pull request does
  - [ ] Why it does it
  - [ ] How it does it
  - [ ] Why this approach
- [ ] Each commit in the pull request should have a meaningful subject line 
and body.
- [ ] Run `./gradlew build` to make sure basic checks pass. A more thorough 
check will be performed on your pull request automatically.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95159)
Time Spent: 10m
Remaining Estimate: 0h

> Flink runner translates batch pipelines directly by proto
> -
>
> Key: BEAM-3972
> URL: https://issues.apache.org/jira/browse/BEAM-3972
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)