[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=98037=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98037 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 03/May/18 21:01 Start Date: 03/May/18 21:01 Worklog Time Spent: 10m Work Description: jkff closed pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SyntheticNodes.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SyntheticComponents.java similarity index 87% rename from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SyntheticNodes.java rename to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SyntheticComponents.java index fc2cb3dc562..f7adf6de5e1 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SyntheticNodes.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SyntheticComponents.java @@ -16,15 +16,15 @@ * limitations under the License. */ -package org.apache.beam.runners.core.construction.graph; +package org.apache.beam.runners.core.construction; import java.util.function.Predicate; /** - * A utility class to interact with synthetic {@link PipelineNode Pipeline Nodes}. + * A utility class to interact with synthetic pipeline components. */ -class SyntheticNodes { - private SyntheticNodes() {} +public class SyntheticComponents { + private SyntheticComponents() {} /** * Generate an ID which does not collide with any existing ID, as determined by the input diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java index ddc03355a90..df3aa5fdfb7 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java @@ -30,6 +30,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.core.construction.SyntheticComponents; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; /** A {@link Pipeline} which has been separated into collections of executable components. */ @@ -106,7 +107,7 @@ static FusedPipeline of( Set usedNames = Sets.union(topLevelTransforms.keySet(), getComponents().getTransformsMap().keySet()); topLevelTransforms.put( - SyntheticNodes.uniqueId(baseName, usedNames::contains), stage.toPTransform()); + SyntheticComponents.uniqueId(baseName, usedNames::contains), stage.toPTransform()); } return topLevelTransforms; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java index 4419787ede1..45c4a27cdcf 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java @@ -39,6 +39,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.SyntheticComponents; import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; @@ -131,7 +132,7 @@ static DeduplicationResult ensureSingleProducer( PTransform flattenPartialPCollections = createFlattenOfPartials(partialFlattenTargets.getKey(), partialFlattenTargets.getValue()); String flattenId = - SyntheticNodes.uniqueId("unzipped_flatten", unzippedComponents::containsTransforms); +
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97736=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97736 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 03/May/18 00:42 Start Date: 03/May/18 00:42 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185677199 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java ## @@ -14,7 +14,7 @@ * the License. */ -package org.apache.beam.runners.fnexecution.graph; +package org.apache.beam.runners.fnexecution.wire; Review comment: @jkff I just looked at the past runs under https://builds.apache.org/view/A-D/view/Beam/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/. It looks like _every_ recent run has either been aborted or failed due to worker timeout issues. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97736) Time Spent: 8.5h (was: 8h 20m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 8.5h > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97734=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97734 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 03/May/18 00:19 Start Date: 03/May/18 00:19 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185671288 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97733=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97733 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 03/May/18 00:19 Start Date: 03/May/18 00:19 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185654927 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97705=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97705 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 02/May/18 22:04 Start Date: 02/May/18 22:04 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185631769 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java ## @@ -14,7 +14,7 @@ * the License. */ -package org.apache.beam.runners.fnexecution.graph; +package org.apache.beam.runners.fnexecution.wire; Review comment: The worker shouldn't depend on this in the staged SDK - so this will be fine. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97705) Time Spent: 7h 50m (was: 7h 40m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 7h 50m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97706=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97706 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 02/May/18 22:04 Start Date: 02/May/18 22:04 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185652571 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97704=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97704 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 02/May/18 22:04 Start Date: 02/May/18 22:04 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185632348 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97679=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97679 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 02/May/18 19:55 Start Date: 02/May/18 19:55 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#issuecomment-386100660 Run Dataflow ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97679) Time Spent: 7h 40m (was: 7.5h) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 7h 40m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97620=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97620 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 02/May/18 18:07 Start Date: 02/May/18 18:07 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#issuecomment-386069062 Run Dataflow ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97620) Time Spent: 7.5h (was: 7h 20m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97576=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97576 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 02/May/18 16:44 Start Date: 02/May/18 16:44 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#issuecomment-386043265 Run Dataflow ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97576) Time Spent: 7h 20m (was: 7h 10m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 7h 20m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97291=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97291 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 23:24 Start Date: 01/May/18 23:24 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#issuecomment-385819389 Run Dataflow ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97291) Time Spent: 7h 10m (was: 7h) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 7h 10m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97290=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97290 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 23:23 Start Date: 01/May/18 23:23 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185359755 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.List; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Flink execution environments. */ +public class FlinkExecutionEnvironments { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class); + + /** + * If the submitted job is a batch processing job, this method creates the adequate Flink {@link + * org.apache.flink.api.java.ExecutionEnvironment} depending on the user-specified options. + */ + public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options) { + +LOG.info("Creating a Batch Execution Environment."); + +String masterUrl = options.getFlinkMaster(); +ExecutionEnvironment flinkBatchEnv; + +// depending on the master, create the right environment. +if ("[local]".equals(masterUrl)) { Review comment: This is something we do in Beam. It's what was done pre-portability. I think we actually want to use a completely different code path in the portable runner. It's the responsibility of whoever spins up the JobService to give Flink the correct execution environment. In any case, this should not be based on user pipeline options but runner startup configuration options. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97290) Time Spent: 7h (was: 6h 50m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 7h > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97264=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97264 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 22:06 Start Date: 01/May/18 22:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185336352 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97263=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97263 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 22:06 Start Date: 01/May/18 22:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185347003 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.List; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Flink execution environments. */ +public class FlinkExecutionEnvironments { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class); + + /** + * If the submitted job is a batch processing job, this method creates the adequate Flink {@link + * org.apache.flink.api.java.ExecutionEnvironment} depending on the user-specified options. + */ + public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options) { + +LOG.info("Creating a Batch Execution Environment."); + +String masterUrl = options.getFlinkMaster(); +ExecutionEnvironment flinkBatchEnv; + +// depending on the master, create the right environment. +if ("[local]".equals(masterUrl)) { + flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); +} else if ("[collection]".equals(masterUrl)) { + flinkBatchEnv = new CollectionEnvironment(); +} else if ("[auto]".equals(masterUrl)) { + flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); +} else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List stagingFiles = options.getFilesToStage(); Review comment: In the case of auto, the runner is either executing locally (i.e., all the jars are already available on the current classpath) or the job is submitted via `flink run`. In the `flink run` case, you pass staging files to the `flink` preparation script and that is done automatically. In any case, we will likely not want this in the portable runner. The runner/job service should not have its classpath polluted by user code. Instead, the files should only be staged via the DistributedCache. I might revert this change and just have the portable runner use its own `ExecutionEnvironment` construction semantics. I think that makes sense here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97263) Time Spent: 5h 50m (was: 5h 40m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97268=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97268 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 22:06 Start Date: 01/May/18 22:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185344604 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.List; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Flink execution environments. */ Review comment: Just a code move. I wish there were a way to indicate this to git. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97268) Time Spent: 6.5h (was: 6h 20m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97261=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97261 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 22:06 Start Date: 01/May/18 22:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185335028 ## File path: runners/flink/build.gradle ## @@ -51,6 +51,7 @@ dependencies { shadow project(path: ":beam-sdks-java-core", configuration: "shadow") shadow project(path: ":beam-runners-core-java", configuration: "shadow") shadow project(path: ":beam-runners-core-construction-java", configuration: "shadow") + shadow project(path: ":beam-runners-java-fn-execution", configuration: "shadow") Review comment: I'm not sure what the status is of keeping the pom files updated. We don't appear to be running the maven precommits any longer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97261) Time Spent: 5.5h (was: 5h 20m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97270=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97270 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 22:06 Start Date: 01/May/18 22:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185342137 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97267=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97267 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 22:06 Start Date: 01/May/18 22:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185340333 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97266=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97266 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 22:06 Start Date: 01/May/18 22:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185346003 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.List; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Flink execution environments. */ +public class FlinkExecutionEnvironments { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class); + + /** + * If the submitted job is a batch processing job, this method creates the adequate Flink {@link + * org.apache.flink.api.java.ExecutionEnvironment} depending on the user-specified options. + */ + public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options) { + +LOG.info("Creating a Batch Execution Environment."); + +String masterUrl = options.getFlinkMaster(); +ExecutionEnvironment flinkBatchEnv; + +// depending on the master, create the right environment. +if ("[local]".equals(masterUrl)) { + flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); +} else if ("[collection]".equals(masterUrl)) { Review comment: What do you mean? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97266) Time Spent: 6h 20m (was: 6h 10m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 6h 20m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97262=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97262 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 22:06 Start Date: 01/May/18 22:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185335332 ## File path: runners/flink/build.gradle ## @@ -51,6 +51,7 @@ dependencies { shadow project(path: ":beam-sdks-java-core", configuration: "shadow") shadow project(path: ":beam-runners-core-java", configuration: "shadow") shadow project(path: ":beam-runners-core-construction-java", configuration: "shadow") + shadow project(path: ":beam-runners-java-fn-execution", configuration: "shadow") Review comment: I've gone ahead and added it for now anyway, but it would be good to know what's going on here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97262) Time Spent: 5h 40m (was: 5.5h) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97260=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97260 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 22:06 Start Date: 01/May/18 22:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185338177 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97269 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 22:06 Start Date: 01/May/18 22:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185344328 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97265=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97265 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 22:06 Start Date: 01/May/18 22:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185343381 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=97225=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97225 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 20:58 Start Date: 01/May/18 20:58 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#issuecomment-385788230 Run Dataflow ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97225) Time Spent: 5h 20m (was: 5h 10m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96920=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96920 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 01:27 Start Date: 01/May/18 01:27 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185151713 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.List; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Flink execution environments. */ Review comment: Is this file just a code move or is there more to it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96920) Time Spent: 5h (was: 4h 50m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96911=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96911 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 01:27 Start Date: 01/May/18 01:27 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185151698 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96912=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96912 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 01:27 Start Date: 01/May/18 01:27 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185151699 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96919=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96919 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 01:27 Start Date: 01/May/18 01:27 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185151710 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.List; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Flink execution environments. */ +public class FlinkExecutionEnvironments { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class); + + /** + * If the submitted job is a batch processing job, this method creates the adequate Flink {@link + * org.apache.flink.api.java.ExecutionEnvironment} depending on the user-specified options. + */ + public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options) { + +LOG.info("Creating a Batch Execution Environment."); + +String masterUrl = options.getFlinkMaster(); +ExecutionEnvironment flinkBatchEnv; + +// depending on the master, create the right environment. +if ("[local]".equals(masterUrl)) { + flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); +} else if ("[collection]".equals(masterUrl)) { + flinkBatchEnv = new CollectionEnvironment(); +} else if ("[auto]".equals(masterUrl)) { + flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); +} else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List stagingFiles = options.getFilesToStage(); Review comment: Is it guaranteed that in the "auto" case we don't need to stage anything? It would be good if we staged stuff anyway, to make sure the code paths are always exercised. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96919) Time Spent: 4h 50m (was: 4h 40m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96914=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96914 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 01:27 Start Date: 01/May/18 01:27 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185151701 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96917=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96917 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 01:27 Start Date: 01/May/18 01:27 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185151706 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java ## @@ -14,7 +14,7 @@ * the License. */ -package org.apache.beam.runners.fnexecution.graph; +package org.apache.beam.runners.fnexecution.wire; Review comment: I'm not sure you can just move this file, the Dataflow worker might depend on it - please confirm by running Dataflow ValidatesRunner tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96917) Time Spent: 4.5h (was: 4h 20m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96916=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96916 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 01:27 Start Date: 01/May/18 01:27 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185151703 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96915=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96915 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 01:27 Start Date: 01/May/18 01:27 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185151700 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96921=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96921 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 01:27 Start Date: 01/May/18 01:27 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185151712 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.List; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Flink execution environments. */ +public class FlinkExecutionEnvironments { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class); + + /** + * If the submitted job is a batch processing job, this method creates the adequate Flink {@link + * org.apache.flink.api.java.ExecutionEnvironment} depending on the user-specified options. + */ + public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options) { + +LOG.info("Creating a Batch Execution Environment."); + +String masterUrl = options.getFlinkMaster(); +ExecutionEnvironment flinkBatchEnv; + +// depending on the master, create the right environment. +if ("[local]".equals(masterUrl)) { Review comment: Are these standard Flink URLs or something we introduce for Beam testing purposes? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96921) Time Spent: 5h 10m (was: 5h) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96913=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96913 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 01:27 Start Date: 01/May/18 01:27 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185151702 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96918=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96918 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 01:27 Start Date: 01/May/18 01:27 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185151709 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.List; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Flink execution environments. */ +public class FlinkExecutionEnvironments { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class); + + /** + * If the submitted job is a batch processing job, this method creates the adequate Flink {@link + * org.apache.flink.api.java.ExecutionEnvironment} depending on the user-specified options. + */ + public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options) { + +LOG.info("Creating a Batch Execution Environment."); + +String masterUrl = options.getFlinkMaster(); +ExecutionEnvironment flinkBatchEnv; + +// depending on the master, create the right environment. +if ("[local]".equals(masterUrl)) { + flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); +} else if ("[collection]".equals(masterUrl)) { Review comment: Does Beam make sense in this environment? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96918) Time Spent: 4h 40m (was: 4.5h) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96896=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96896 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 01/May/18 00:50 Start Date: 01/May/18 00:50 Worklog Time Spent: 10m Work Description: tweise commented on issue #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#issuecomment-385571641 The translator isn't wired and `FlinkJobServerDriver` isn't in master yet, is there a separate JIRA for that? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96896) Time Spent: 3h 20m (was: 3h 10m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96887=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96887 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 30/Apr/18 23:37 Start Date: 30/Apr/18 23:37 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185139455 ## File path: runners/flink/build.gradle ## @@ -51,6 +51,7 @@ dependencies { shadow project(path: ":beam-sdks-java-core", configuration: "shadow") shadow project(path: ":beam-runners-core-java", configuration: "shadow") shadow project(path: ":beam-runners-core-construction-java", configuration: "shadow") + shadow project(path: ":beam-runners-java-fn-execution", configuration: "shadow") Review comment: Needs to be added to pom.xml also? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96887) Time Spent: 3h 10m (was: 3h) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96886=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96886 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 30/Apr/18 23:32 Start Date: 30/Apr/18 23:32 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185138668 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96854=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96854 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 30/Apr/18 20:59 Start Date: 30/Apr/18 20:59 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#issuecomment-385526902 All precommits appear to be passing now. I'm going to go ahead and clean up commit history. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96854) Time Spent: 2h 50m (was: 2h 40m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96846=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96846 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 30/Apr/18 20:37 Start Date: 30/Apr/18 20:37 Worklog Time Spent: 10m Work Description: angoenka commented on issue #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#issuecomment-385521212 LGTM. Will wait for builds to finish before approving. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96846) Time Spent: 2h 40m (was: 2.5h) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96836=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96836 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 30/Apr/18 20:18 Start Date: 30/Apr/18 20:18 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#issuecomment-385516112 Rebased. PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96836) Time Spent: 2.5h (was: 2h 20m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96834=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96834 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 30/Apr/18 20:07 Start Date: 30/Apr/18 20:07 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185095273 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/Componentses.java ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution; + +import java.util.function.Predicate; + +/** Utilities for {@link org.apache.beam.model.pipeline.v1.RunnerApi.Components}. */ +public class Componentses { + // TODO: Remove this class and replace with SyntheticNodes.uniqueId once + // https://github.com/apache/beam/pull/4977 lands. Review comment: I've pulled in SyntheticNodes after rebasing. However, that class is package-private and is not quite named correctly for what we want. I've changed it to org.apache.beam.runners.core.construction.SyntheticComponents. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96834) Time Spent: 2h 20m (was: 2h 10m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96790=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96790 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 30/Apr/18 17:55 Start Date: 30/Apr/18 17:55 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185055889 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.List; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Flink execution environments. */ +public class FlinkExecutionEnvironments { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class); + + /** + * If the submitted job is a batch processing job, this method creates the adequate Flink {@link + * org.apache.flink.api.java.ExecutionEnvironment} depending on the user-specified options. + */ + public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options) { Review comment: That's a good idea. Unfortunately, we can't realistically mock or substitute our own `ExecutionEnvironment` because it is expected to actually perform pipeline execution for us. The default environment (as constructed here) actually runs a local Flink "mini cluster" when used in tests. We already use this for our Flink end-to-end tests, including the `ValidatesRunner` tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96790) Time Spent: 2h (was: 1h 50m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96791=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96791 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 30/Apr/18 17:55 Start Date: 30/Apr/18 17:55 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185059137 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=96792=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96792 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 30/Apr/18 17:55 Start Date: 30/Apr/18 17:55 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r185059970 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95793=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95793 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 27/Apr/18 00:00 Start Date: 27/Apr/18 00:00 Worklog Time Spent: 10m Work Description: jkff commented on issue #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#issuecomment-384823663 Looks like this also needs a rebase. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95793) Time Spent: 1h 50m (was: 1h 40m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95314=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95314 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 26/Apr/18 00:47 Start Date: 26/Apr/18 00:47 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r184205121 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95315=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95315 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 26/Apr/18 00:47 Start Date: 26/Apr/18 00:47 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r184221413 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.List; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Flink execution environments. */ +public class FlinkExecutionEnvironments { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class); + + /** + * If the submitted job is a batch processing job, this method creates the adequate Flink {@link + * org.apache.flink.api.java.ExecutionEnvironment} depending on the user-specified options. + */ + public static ExecutionEnvironment createBatchExecutionEnvironment(FlinkPipelineOptions options) { Review comment: Instead of using static methods, shall we use factory to create the environments. Static methods are hard to swap and might create challenge while testing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95315) Time Spent: 1h 40m (was: 1.5h) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95242=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95242 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 25/Apr/18 20:45 Start Date: 25/Apr/18 20:45 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r184195682 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95243=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95243 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 25/Apr/18 20:45 Start Date: 25/Apr/18 20:45 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r184196551 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.model.pipeline.v1.RunnerApi; + +/** Interface for portable Flink translators. */ Review comment: Expanded. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95243) Time Spent: 1.5h (was: 1h 20m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95241=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95241 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 25/Apr/18 20:45 Start Date: 25/Apr/18 20:45 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r184195037 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95240=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95240 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 25/Apr/18 20:45 Start Date: 25/Apr/18 20:45 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r184177308 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95196=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95196 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 25/Apr/18 19:15 Start Date: 25/Apr/18 19:15 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r184169631 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95193=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95193 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 25/Apr/18 19:15 Start Date: 25/Apr/18 19:15 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r184165603 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95194=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95194 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 25/Apr/18 19:15 Start Date: 25/Apr/18 19:15 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r184167002 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; +import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95195=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95195 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 25/Apr/18 19:15 Start Date: 25/Apr/18 19:15 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#discussion_r184173167 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.model.pipeline.v1.RunnerApi; + +/** Interface for portable Flink translators. */ Review comment: Comment is not very helpful. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95195) Time Spent: 50m (was: 40m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95162=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95162 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 25/Apr/18 18:19 Start Date: 25/Apr/18 18:19 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#issuecomment-384385178 R: @jkff @angoenka @tgroh This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95162) Time Spent: 0.5h (was: 20m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95161=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95161 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 25/Apr/18 18:19 Start Date: 25/Apr/18 18:19 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226#issuecomment-384385005 The `Componentses` class should go away in favor of `SyntheticNodes` once #4977 lands. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95161) Time Spent: 20m (was: 10m) > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3972) Flink runner translates batch pipelines directly by proto
[ https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=95159=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95159 ] ASF GitHub Bot logged work on BEAM-3972: Author: ASF GitHub Bot Created on: 25/Apr/18 18:18 Start Date: 25/Apr/18 18:18 Worklog Time Spent: 10m Work Description: bsidhom opened a new pull request #5226: [BEAM-3972] Translate portable batch pipelines by proto URL: https://github.com/apache/beam/pull/5226 Note that Flink does not provide a way to make assertions on the output graph shape and we can't yet execute ExecutableStages. The plan is to test translation end-to-end by ValidateRunner tests once the missing pieces are in. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand: - [ ] What the pull request does - [ ] Why it does it - [ ] How it does it - [ ] Why this approach - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Run `./gradlew build` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95159) Time Spent: 10m Remaining Estimate: 0h > Flink runner translates batch pipelines directly by proto > - > > Key: BEAM-3972 > URL: https://issues.apache.org/jira/browse/BEAM-3972 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Ben Sidhom >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > The non-portable runner uses reydrated pipelines which lack necessary > information. The portable Flink runner needs to translate pipelines directly > by proto in order to wire components into individual executable stages > correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)