http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java new file mode 100644 index 0000000..123d5e7 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -0,0 +1,1044 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.beam.runners.core.ElementAndRestriction; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; +import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; +import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; +import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator; +import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; +import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.AppliedCombineFn; +import org.apache.beam.sdk.util.Reshuffle; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.transformations.TwoInputTransformation; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class contains all the mappings between Beam and Flink + * <b>streaming</b> transformations. The {@link FlinkStreamingPipelineTranslator} + * traverses the Beam job and comes here to translate the encountered Beam transformations + * into Flink one, based on the mapping available in this class. + */ +class FlinkStreamingTransformTranslators { + + // -------------------------------------------------------------------------------------------- + // Transform Translator Registry + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private static final Map< + Class<? extends PTransform>, + FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>(); + + // here you can find all the available translators. + static { + TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator()); + TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); + TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); + + TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator()); + TRANSLATORS.put( + SplittableParDo.ProcessElements.class, new SplittableProcessElementsStreamingTranslator()); + TRANSLATORS.put( + SplittableParDo.GBKIntoKeyedWorkItems.class, new GBKIntoKeyedWorkItemsTranslator()); + + + TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator()); + TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator()); + TRANSLATORS.put( + FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class, + new CreateViewStreamingTranslator()); + + TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming()); + TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); + TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); + } + + public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator( + PTransform<?, ?> transform) { + return TRANSLATORS.get(transform.getClass()); + } + + // -------------------------------------------------------------------------------------------- + // Transformation Implementations + // -------------------------------------------------------------------------------------------- + + private static class TextIOWriteBoundStreamingTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> { + + private static final Logger LOG = + LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); + + @Override + public void translateNode( + TextIO.Write.Bound transform, + FlinkStreamingTranslationContext context) { + PValue input = context.getInput(transform); + DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input); + + String filenamePrefix = transform.getFilenamePrefix(); + String filenameSuffix = transform.getFilenameSuffix(); + boolean needsValidation = transform.needsValidation(); + int numShards = transform.getNumShards(); + String shardNameTemplate = transform.getShardNameTemplate(); + + // TODO: Implement these. We need Flink support for this. + LOG.warn( + "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", + needsValidation); + LOG.warn( + "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", + filenameSuffix); + LOG.warn( + "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", + shardNameTemplate); + + DataStream<String> dataSink = inputDataStream + .flatMap(new FlatMapFunction<WindowedValue<String>, String>() { + @Override + public void flatMap( + WindowedValue<String> value, + Collector<String> out) + throws Exception { + out.collect(value.getValue()); + } + }); + DataStreamSink<String> output = + dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE); + + if (numShards > 0) { + output.setParallelism(numShards); + } + } + } + + private static class UnboundedReadSourceTranslator<T> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> { + + @Override + public void translateNode( + Read.Unbounded<T> transform, + FlinkStreamingTranslationContext context) { + PCollection<T> output = context.getOutput(transform); + + TypeInformation<WindowedValue<T>> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + DataStream<WindowedValue<T>> source; + try { + UnboundedSourceWrapper<T, ?> sourceWrapper = + new UnboundedSourceWrapper<>( + context.getPipelineOptions(), + transform.getSource(), + context.getExecutionEnvironment().getParallelism()); + source = context + .getExecutionEnvironment() + .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); + } catch (Exception e) { + throw new RuntimeException( + "Error while translating UnboundedSource: " + transform.getSource(), e); + } + + context.setOutputDataStream(output, source); + } + } + + private static class BoundedReadSourceTranslator<T> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> { + + @Override + public void translateNode( + Read.Bounded<T> transform, + FlinkStreamingTranslationContext context) { + PCollection<T> output = context.getOutput(transform); + + TypeInformation<WindowedValue<T>> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + + DataStream<WindowedValue<T>> source; + try { + BoundedSourceWrapper<T> sourceWrapper = + new BoundedSourceWrapper<>( + context.getPipelineOptions(), + transform.getSource(), + context.getExecutionEnvironment().getParallelism()); + source = context + .getExecutionEnvironment() + .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); + } catch (Exception e) { + throw new RuntimeException( + "Error while translating BoundedSource: " + transform.getSource(), e); + } + + context.setOutputDataStream(output, source); + } + } + + /** + * Wraps each element in a {@link RawUnionValue} with the given tag id. + */ + private static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> { + private final int intTag; + + public ToRawUnion(int intTag) { + this.intTag = intTag; + } + + @Override + public RawUnionValue map(T o) throws Exception { + return new RawUnionValue(intTag, o); + } + } + + private static Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> + transformSideInputs( + Collection<PCollectionView<?>> sideInputs, + FlinkStreamingTranslationContext context) { + + // collect all side inputs + Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>(); + Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>(); + int count = 0; + for (PCollectionView<?> sideInput: sideInputs) { + TupleTag<?> tag = sideInput.getTagInternal(); + intToViewMapping.put(count, sideInput); + tagToIntMapping.put(tag, count); + count++; + Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal(); + } + + + List<Coder<?>> inputCoders = new ArrayList<>(); + for (PCollectionView<?> sideInput: sideInputs) { + DataStream<Object> sideInputStream = context.getInputDataStream(sideInput); + TypeInformation<Object> tpe = sideInputStream.getType(); + if (!(tpe instanceof CoderTypeInformation)) { + throw new IllegalStateException( + "Input Stream TypeInformation is no CoderTypeInformation."); + } + + Coder<?> coder = ((CoderTypeInformation) tpe).getCoder(); + inputCoders.add(coder); + } + + UnionCoder unionCoder = UnionCoder.of(inputCoders); + + CoderTypeInformation<RawUnionValue> unionTypeInformation = + new CoderTypeInformation<>(unionCoder); + + // transform each side input to RawUnionValue and union them + DataStream<RawUnionValue> sideInputUnion = null; + + for (PCollectionView<?> sideInput: sideInputs) { + TupleTag<?> tag = sideInput.getTagInternal(); + final int intTag = tagToIntMapping.get(tag); + DataStream<Object> sideInputStream = context.getInputDataStream(sideInput); + DataStream<RawUnionValue> unionValueStream = + sideInputStream.map(new ToRawUnion<>(intTag)).returns(unionTypeInformation); + + if (sideInputUnion == null) { + sideInputUnion = unionValueStream; + } else { + sideInputUnion = sideInputUnion.union(unionValueStream); + } + } + + if (sideInputUnion == null) { + throw new IllegalStateException("No unioned side inputs, this indicates a bug."); + } + + return new Tuple2<>(intToViewMapping, sideInputUnion); + } + + /** + * Helper for translating {@link ParDo.MultiOutput} and {@link SplittableParDo.ProcessElements}. + */ + static class ParDoTranslationHelper { + + interface DoFnOperatorFactory<InputT, OutputT> { + DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator( + DoFn<InputT, OutputT> doFn, + List<PCollectionView<?>> sideInputs, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + FlinkStreamingTranslationContext context, + WindowingStrategy<?, ?> windowingStrategy, + Map<TupleTag<?>, Integer> tagsToLabels, + Coder<WindowedValue<InputT>> inputCoder, + Coder keyCoder, + Map<Integer, PCollectionView<?>> transformedSideInputs); + } + + static <InputT, OutputT> void translateParDo( + String transformName, + DoFn<InputT, OutputT> doFn, + PCollection<InputT> input, + List<PCollectionView<?>> sideInputs, + Map<TupleTag<?>, PValue> outputs, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + FlinkStreamingTranslationContext context, + DoFnOperatorFactory<InputT, OutputT> doFnOperatorFactory) { + + // we assume that the transformation does not change the windowing strategy. + WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); + + Map<TupleTag<?>, Integer> tagsToLabels = + transformTupleTagsToLabels(mainOutputTag, outputs); + + SingleOutputStreamOperator<RawUnionValue> unionOutputStream; + + Coder<WindowedValue<InputT>> inputCoder = context.getCoder(input); + + DataStream<WindowedValue<InputT>> inputDataStream = context.getInputDataStream(input); + + Coder keyCoder = null; + boolean stateful = false; + DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + // Based on the fact that the signature is stateful, DoFnSignatures ensures + // that it is also keyed + keyCoder = ((KvCoder) input.getCoder()).getKeyCoder(); + inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder)); + stateful = true; + } else if (doFn instanceof SplittableParDo.ProcessFn) { + // we know that it is keyed on String + keyCoder = StringUtf8Coder.of(); + stateful = true; + } + + if (sideInputs.isEmpty()) { + DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = + doFnOperatorFactory.createDoFnOperator( + doFn, + sideInputs, + mainOutputTag, + additionalOutputTags, + context, + windowingStrategy, + tagsToLabels, + inputCoder, + keyCoder, + new HashMap<Integer, PCollectionView<?>>() /* side-input mapping */); + + UnionCoder outputUnionCoder = createUnionCoder(outputs); + + CoderTypeInformation<RawUnionValue> outputUnionTypeInformation = + new CoderTypeInformation<>(outputUnionCoder); + + unionOutputStream = inputDataStream + .transform(transformName, outputUnionTypeInformation, doFnOperator); + + } else { + Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs = + transformSideInputs(sideInputs, context); + + DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = + doFnOperatorFactory.createDoFnOperator( + doFn, + sideInputs, + mainOutputTag, + additionalOutputTags, + context, + windowingStrategy, + tagsToLabels, + inputCoder, + keyCoder, + transformedSideInputs.f0); + + UnionCoder outputUnionCoder = createUnionCoder(outputs); + + CoderTypeInformation<RawUnionValue> outputUnionTypeInformation = + new CoderTypeInformation<>(outputUnionCoder); + + if (stateful) { + // we have to manually contruct the two-input transform because we're not + // allowed to have only one input keyed, normally. + KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream; + TwoInputTransformation< + WindowedValue<KV<?, InputT>>, + RawUnionValue, + WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation( + keyedStream.getTransformation(), + transformedSideInputs.f1.broadcast().getTransformation(), + transformName, + (TwoInputStreamOperator) doFnOperator, + outputUnionTypeInformation, + keyedStream.getParallelism()); + + rawFlinkTransform.setStateKeyType(keyedStream.getKeyType()); + rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null); + + unionOutputStream = new SingleOutputStreamOperator( + keyedStream.getExecutionEnvironment(), + rawFlinkTransform) {}; // we have to cheat around the ctor being protected + + keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform); + + } else { + unionOutputStream = inputDataStream + .connect(transformedSideInputs.f1.broadcast()) + .transform(transformName, outputUnionTypeInformation, doFnOperator); + } + } + + SplitStream<RawUnionValue> splitStream = unionOutputStream + .split(new OutputSelector<RawUnionValue>() { + @Override + public Iterable<String> select(RawUnionValue value) { + return Collections.singletonList(Integer.toString(value.getUnionTag())); + } + }); + + for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { + final int outputTag = tagsToLabels.get(output.getKey()); + + TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) output.getValue()); + + @SuppressWarnings("unchecked") + DataStream unwrapped = splitStream.select(String.valueOf(outputTag)) + .flatMap(new FlatMapFunction<RawUnionValue, Object>() { + @Override + public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception { + out.collect(value.getValue()); + } + }).returns(outputTypeInfo); + + context.setOutputDataStream(output.getValue(), unwrapped); + } + } + + private static Map<TupleTag<?>, Integer> transformTupleTagsToLabels( + TupleTag<?> mainTag, + Map<TupleTag<?>, PValue> allTaggedValues) { + + Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap(); + int count = 0; + tagToLabelMap.put(mainTag, count++); + for (TupleTag<?> key : allTaggedValues.keySet()) { + if (!tagToLabelMap.containsKey(key)) { + tagToLabelMap.put(key, count++); + } + } + return tagToLabelMap; + } + + private static UnionCoder createUnionCoder(Map<TupleTag<?>, PValue> taggedCollections) { + List<Coder<?>> outputCoders = Lists.newArrayList(); + for (PValue taggedColl : taggedCollections.values()) { + checkArgument( + taggedColl instanceof PCollection, + "A Union Coder can only be created for a Collection of Tagged %s. Got %s", + PCollection.class.getSimpleName(), + taggedColl.getClass().getSimpleName()); + PCollection<?> coll = (PCollection<?>) taggedColl; + WindowedValue.FullWindowedValueCoder<?> windowedValueCoder = + WindowedValue.getFullCoder( + coll.getCoder(), + coll.getWindowingStrategy().getWindowFn().windowCoder()); + outputCoders.add(windowedValueCoder); + } + return UnionCoder.of(outputCoders); + } + } + + private static class ParDoStreamingTranslator<InputT, OutputT> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + ParDo.MultiOutput<InputT, OutputT>> { + + @Override + public void translateNode( + ParDo.MultiOutput<InputT, OutputT> transform, + FlinkStreamingTranslationContext context) { + + ParDoTranslationHelper.translateParDo( + transform.getName(), + transform.getFn(), + (PCollection<InputT>) context.getInput(transform), + transform.getSideInputs(), + context.getOutputs(transform), + transform.getMainOutputTag(), + transform.getAdditionalOutputTags().getAll(), + context, + new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() { + @Override + public DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator( + DoFn<InputT, OutputT> doFn, + List<PCollectionView<?>> sideInputs, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + FlinkStreamingTranslationContext context, + WindowingStrategy<?, ?> windowingStrategy, + Map<TupleTag<?>, Integer> tagsToLabels, + Coder<WindowedValue<InputT>> inputCoder, + Coder keyCoder, + Map<Integer, PCollectionView<?>> transformedSideInputs) { + return new DoFnOperator<>( + doFn, + inputCoder, + mainOutputTag, + additionalOutputTags, + new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + windowingStrategy, + transformedSideInputs, + sideInputs, + context.getPipelineOptions(), + keyCoder); + } + }); + } + } + + private static class SplittableProcessElementsStreamingTranslator< + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT>> { + + @Override + public void translateNode( + SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform, + FlinkStreamingTranslationContext context) { + + ParDoTranslationHelper.translateParDo( + transform.getName(), + transform.newProcessFn(transform.getFn()), + (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>) + context.getInput(transform), + transform.getSideInputs(), + context.getOutputs(transform), + transform.getMainOutputTag(), + transform.getAdditionalOutputTags().getAll(), + context, + new ParDoTranslationHelper.DoFnOperatorFactory< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() { + @Override + public DoFnOperator< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, + OutputT, + RawUnionValue> createDoFnOperator( + DoFn< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, + OutputT> doFn, + List<PCollectionView<?>> sideInputs, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + FlinkStreamingTranslationContext context, + WindowingStrategy<?, ?> windowingStrategy, + Map<TupleTag<?>, Integer> tagsToLabels, + Coder< + WindowedValue< + KeyedWorkItem< + String, + ElementAndRestriction<InputT, RestrictionT>>>> inputCoder, + Coder keyCoder, + Map<Integer, PCollectionView<?>> transformedSideInputs) { + return new SplittableDoFnOperator<>( + doFn, + inputCoder, + mainOutputTag, + additionalOutputTags, + new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels), + windowingStrategy, + transformedSideInputs, + sideInputs, + context.getPipelineOptions(), + keyCoder); + } + }); + } + } + + private static class CreateViewStreamingTranslator<ElemT, ViewT> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> { + + @Override + public void translateNode( + FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> transform, + FlinkStreamingTranslationContext context) { + // just forward + DataStream<WindowedValue<List<ElemT>>> inputDataSet = + context.getInputDataStream(context.getInput(transform)); + + PCollectionView<ViewT> view = context.getOutput(transform); + + context.setOutputDataStream(view, inputDataSet); + } + } + + private static class WindowAssignTranslator<T> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Assign<T>> { + + @Override + public void translateNode( + Window.Assign<T> transform, + FlinkStreamingTranslationContext context) { + + @SuppressWarnings("unchecked") + WindowingStrategy<T, BoundedWindow> windowingStrategy = + (WindowingStrategy<T, BoundedWindow>) + context.getOutput(transform).getWindowingStrategy(); + + TypeInformation<WindowedValue<T>> typeInfo = + context.getTypeInfo(context.getOutput(transform)); + + DataStream<WindowedValue<T>> inputDataStream = + context.getInputDataStream(context.getInput(transform)); + + WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn(); + + FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction = + new FlinkAssignWindows<>(windowFn); + + SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = inputDataStream + .flatMap(assignWindowsFunction) + .name(context.getOutput(transform).getName()) + .returns(typeInfo); + + context.setOutputDataStream(context.getOutput(transform), outputDataStream); + } + } + + private static class ReshuffleTranslatorStreaming<K, InputT> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, InputT>> { + + @Override + public void translateNode( + Reshuffle<K, InputT> transform, + FlinkStreamingTranslationContext context) { + + DataStream<WindowedValue<KV<K, InputT>>> inputDataSet = + context.getInputDataStream(context.getInput(transform)); + + context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance()); + + } + } + + + private static class GroupByKeyTranslator<K, InputT> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> { + + @Override + public void translateNode( + GroupByKey<K, InputT> transform, + FlinkStreamingTranslationContext context) { + + PCollection<KV<K, InputT>> input = context.getInput(transform); + + @SuppressWarnings("unchecked") + WindowingStrategy<?, BoundedWindow> windowingStrategy = + (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); + + KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder(); + + SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of( + inputKvCoder.getKeyCoder(), + inputKvCoder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder()); + + DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input); + + WindowedValue. + FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder = + WindowedValue.getFullCoder( + workItemCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo = + new CoderTypeInformation<>(windowedWorkItemCoder); + + DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream = + inputDataStream + .flatMap(new ToKeyedWorkItem<K, InputT>()) + .returns(workItemTypeInfo).name("ToKeyedWorkItem"); + + KeyedStream< + WindowedValue< + SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream + .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder())); + + SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn = + SystemReduceFn.buffering(inputKvCoder.getValueCoder()); + + TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + DoFnOperator.DefaultOutputManagerFactory< + WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory = + new DoFnOperator.DefaultOutputManagerFactory<>(); + + WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator = + new WindowDoFnOperator<>( + reduceFn, + (Coder) windowedWorkItemCoder, + new TupleTag<KV<K, Iterable<InputT>>>("main output"), + Collections.<TupleTag<?>>emptyList(), + outputManagerFactory, + windowingStrategy, + new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ + Collections.<PCollectionView<?>>emptyList(), /* side inputs */ + context.getPipelineOptions(), + inputKvCoder.getKeyCoder()); + + // our operator excepts WindowedValue<KeyedWorkItem> while our input stream + // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ... + @SuppressWarnings("unchecked") + SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> outDataStream = + keyedWorkItemStream + .transform( + transform.getName(), + outputTypeInfo, + (OneInputStreamOperator) doFnOperator); + + context.setOutputDataStream(context.getOutput(transform), outDataStream); + + } + } + + private static class CombinePerKeyTranslator<K, InputT, OutputT> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + Combine.PerKey<K, InputT, OutputT>> { + + @Override + boolean canTranslate( + Combine.PerKey<K, InputT, OutputT> transform, + FlinkStreamingTranslationContext context) { + + // if we have a merging window strategy and side inputs we cannot + // translate as a proper combine. We have to group and then run the combine + // over the final grouped values. + PCollection<KV<K, InputT>> input = context.getInput(transform); + + @SuppressWarnings("unchecked") + WindowingStrategy<?, BoundedWindow> windowingStrategy = + (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); + + return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty(); + } + + @Override + public void translateNode( + Combine.PerKey<K, InputT, OutputT> transform, + FlinkStreamingTranslationContext context) { + + PCollection<KV<K, InputT>> input = context.getInput(transform); + + @SuppressWarnings("unchecked") + WindowingStrategy<?, BoundedWindow> windowingStrategy = + (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy(); + + KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder(); + + SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of( + inputKvCoder.getKeyCoder(), + inputKvCoder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder()); + + DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input); + + WindowedValue. + FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder = + WindowedValue.getFullCoder( + workItemCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo = + new CoderTypeInformation<>(windowedWorkItemCoder); + + DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream = + inputDataStream + .flatMap(new ToKeyedWorkItem<K, InputT>()) + .returns(workItemTypeInfo).name("ToKeyedWorkItem"); + + KeyedStream< + WindowedValue< + SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream + .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder())); + + SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = SystemReduceFn.combining( + inputKvCoder.getKeyCoder(), + AppliedCombineFn.withInputCoder( + transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder)); + + TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + List<PCollectionView<?>> sideInputs = transform.getSideInputs(); + + if (sideInputs.isEmpty()) { + + WindowDoFnOperator<K, InputT, OutputT> doFnOperator = + new WindowDoFnOperator<>( + reduceFn, + (Coder) windowedWorkItemCoder, + new TupleTag<KV<K, OutputT>>("main output"), + Collections.<TupleTag<?>>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(), + windowingStrategy, + new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ + Collections.<PCollectionView<?>>emptyList(), /* side inputs */ + context.getPipelineOptions(), + inputKvCoder.getKeyCoder()); + + // our operator excepts WindowedValue<KeyedWorkItem> while our input stream + // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ... + @SuppressWarnings("unchecked") + SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream = + keyedWorkItemStream.transform( + transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator); + + context.setOutputDataStream(context.getOutput(transform), outDataStream); + } else { + Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs = + transformSideInputs(sideInputs, context); + + WindowDoFnOperator<K, InputT, OutputT> doFnOperator = + new WindowDoFnOperator<>( + reduceFn, + (Coder) windowedWorkItemCoder, + new TupleTag<KV<K, OutputT>>("main output"), + Collections.<TupleTag<?>>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(), + windowingStrategy, + transformSideInputs.f0, + sideInputs, + context.getPipelineOptions(), + inputKvCoder.getKeyCoder()); + + // we have to manually contruct the two-input transform because we're not + // allowed to have only one input keyed, normally. + + TwoInputTransformation< + WindowedValue<SingletonKeyedWorkItem<K, InputT>>, + RawUnionValue, + WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new TwoInputTransformation<>( + keyedWorkItemStream.getTransformation(), + transformSideInputs.f1.broadcast().getTransformation(), + transform.getName(), + (TwoInputStreamOperator) doFnOperator, + outputTypeInfo, + keyedWorkItemStream.getParallelism()); + + rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType()); + rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream = + new SingleOutputStreamOperator( + keyedWorkItemStream.getExecutionEnvironment(), + rawFlinkTransform) {}; // we have to cheat around the ctor being protected + + keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform); + + context.setOutputDataStream(context.getOutput(transform), outDataStream); + } + } + } + + private static class GBKIntoKeyedWorkItemsTranslator<K, InputT> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + SplittableParDo.GBKIntoKeyedWorkItems<K, InputT>> { + + @Override + boolean canTranslate( + SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform, + FlinkStreamingTranslationContext context) { + return true; + } + + @Override + public void translateNode( + SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform, + FlinkStreamingTranslationContext context) { + + PCollection<KV<K, InputT>> input = context.getInput(transform); + + KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder(); + + SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of( + inputKvCoder.getKeyCoder(), + inputKvCoder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder()); + + + WindowedValue. + FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder = + WindowedValue.getFullCoder( + workItemCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo = + new CoderTypeInformation<>(windowedWorkItemCoder); + + DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input); + + DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream = + inputDataStream + .flatMap(new ToKeyedWorkItem<K, InputT>()) + .returns(workItemTypeInfo).name("ToKeyedWorkItem"); + + KeyedStream< + WindowedValue< + SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream + .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder())); + + context.setOutputDataStream(context.getOutput(transform), keyedWorkItemStream); + } + } + + private static class FlattenPCollectionTranslator<T> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + Flatten.PCollections<T>> { + + @Override + public void translateNode( + Flatten.PCollections<T> transform, + FlinkStreamingTranslationContext context) { + Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform); + + if (allInputs.isEmpty()) { + + // create an empty dummy source to satisfy downstream operations + // we cannot create an empty source in Flink, therefore we have to + // add the flatMap that simply never forwards the single element + DataStreamSource<String> dummySource = + context.getExecutionEnvironment().fromElements("dummy"); + + DataStream<WindowedValue<T>> result = dummySource.flatMap( + new FlatMapFunction<String, WindowedValue<T>>() { + @Override + public void flatMap( + String s, + Collector<WindowedValue<T>> collector) throws Exception { + // never return anything + } + }).returns( + new CoderTypeInformation<>( + WindowedValue.getFullCoder( + (Coder<T>) VoidCoder.of(), + GlobalWindow.Coder.INSTANCE))); + context.setOutputDataStream(context.getOutput(transform), result); + + } else { + DataStream<T> result = null; + for (PValue input : allInputs.values()) { + DataStream<T> current = context.getInputDataStream(input); + result = (result == null) ? current : result.union(current); + } + context.setOutputDataStream(context.getOutput(transform), result); + } + } + } + + private static class ToKeyedWorkItem<K, InputT> + extends RichFlatMapFunction< + WindowedValue<KV<K, InputT>>, + WindowedValue<SingletonKeyedWorkItem<K, InputT>>> { + + @Override + public void flatMap( + WindowedValue<KV<K, InputT>> inWithMultipleWindows, + Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception { + + // we need to wrap each one work item per window for now + // since otherwise the PushbackSideInputRunner will not correctly + // determine whether side inputs are ready + // + // this is tracked as https://issues.apache.org/jira/browse/BEAM-1850 + for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) { + SingletonKeyedWorkItem<K, InputT> workItem = + new SingletonKeyedWorkItem<>( + in.getValue().getKey(), + in.withValue(in.getValue().getValue())); + + out.collect(in.withValue(workItem)); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java new file mode 100644 index 0000000..1a943a3 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Iterables; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * Helper for keeping track of which {@link DataStream DataStreams} map + * to which {@link PTransform PTransforms}. + */ +class FlinkStreamingTranslationContext { + + private final StreamExecutionEnvironment env; + private final PipelineOptions options; + + /** + * Keeps a mapping between the output value of the PTransform (in Dataflow) and the + * Flink Operator that produced it, after the translation of the correspondinf PTransform + * to its Flink equivalent. + * */ + private final Map<PValue, DataStream<?>> dataStreams; + + private AppliedPTransform<?, ?, ?> currentTransform; + + public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) { + this.env = checkNotNull(env); + this.options = checkNotNull(options); + this.dataStreams = new HashMap<>(); + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return env; + } + + public PipelineOptions getPipelineOptions() { + return options; + } + + @SuppressWarnings("unchecked") + public <T> DataStream<T> getInputDataStream(PValue value) { + return (DataStream<T>) dataStreams.get(value); + } + + public void setOutputDataStream(PValue value, DataStream<?> set) { + if (!dataStreams.containsKey(value)) { + dataStreams.put(value, set); + } + } + + /** + * Sets the AppliedPTransform which carries input/output. + * @param currentTransform + */ + public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { + this.currentTransform = currentTransform; + } + + public <T> Coder<WindowedValue<T>> getCoder(PCollection<T> collection) { + Coder<T> valueCoder = collection.getCoder(); + + return WindowedValue.getFullCoder( + valueCoder, + collection.getWindowingStrategy().getWindowFn().windowCoder()); + } + + @SuppressWarnings("unchecked") + public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) { + Coder<T> valueCoder = collection.getCoder(); + WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = + WindowedValue.getFullCoder( + valueCoder, + collection.getWindowingStrategy().getWindowFn().windowCoder()); + + return new CoderTypeInformation<>(windowedValueCoder); + } + + + @SuppressWarnings("unchecked") + public <T extends PValue> T getInput(PTransform<T, ?> transform) { + return (T) Iterables.getOnlyElement(currentTransform.getInputs().values()); + } + + public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T, ?> transform) { + return currentTransform.getInputs(); + } + + @SuppressWarnings("unchecked") + public <T extends PValue> T getOutput(PTransform<?, T> transform) { + return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values()); + } + + public <OutputT extends POutput> Map<TupleTag<?>, PValue> getOutputs( + PTransform<?, OutputT> transform) { + return currentTransform.getOutputs(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java new file mode 100644 index 0000000..f955f2a --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.util.PCollectionViews; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Flink streaming overrides for various view (side input) transforms. + */ +class FlinkStreamingViewOverrides { + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} + * for the Flink runner in streaming mode. + */ + static class StreamingViewAsMap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { + + private final transient FlinkRunner runner; + + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) { + this.runner = runner; + } + + @Override + public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, V>> view = + PCollectionViews.mapView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + runner.recordViewUsesNonDeterministicKeyCoder(this); + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsMap"; + } + } + + /** + * Specialized expansion for {@link + * View.AsMultimap View.AsMultimap} for the + * Flink runner in streaming mode. + */ + static class StreamingViewAsMultimap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { + + private final transient FlinkRunner runner; + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> transform) { + this.runner = runner; + } + + @Override + public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, Iterable<V>>> view = + PCollectionViews.multimapView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + runner.recordViewUsesNonDeterministicKeyCoder(this); + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsMultimap"; + } + } + + /** + * Specialized implementation for + * {@link View.AsList View.AsList} for the + * Flink runner in streaming mode. + */ + static class StreamingViewAsList<T> + extends PTransform<PCollection<T>, PCollectionView<List<T>>> { + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {} + + @Override + public PCollectionView<List<T>> expand(PCollection<T> input) { + PCollectionView<List<T>> view = + PCollectionViews.listView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateFlinkPCollectionView.<T, List<T>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsList"; + } + } + + /** + * Specialized implementation for + * {@link View.AsIterable View.AsIterable} for the + * Flink runner in streaming mode. + */ + static class StreamingViewAsIterable<T> + extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { } + + @Override + public PCollectionView<Iterable<T>> expand(PCollection<T> input) { + PCollectionView<Iterable<T>> view = + PCollectionViews.iterableView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsIterable"; + } + } + + /** + * Specialized expansion for + * {@link View.AsSingleton View.AsSingleton} for the + * Flink runner in streaming mode. + */ + static class StreamingViewAsSingleton<T> + extends PTransform<PCollection<T>, PCollectionView<T>> { + private View.AsSingleton<T> transform; + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> transform) { + this.transform = transform; + } + + @Override + public PCollectionView<T> expand(PCollection<T> input) { + Combine.Globally<T, T> combine = Combine.globally( + new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); + if (!transform.hasDefaultValue()) { + combine = combine.withoutDefaults(); + } + return input.apply(combine.asSingletonView()); + } + + @Override + protected String getKindString() { + return "StreamingViewAsSingleton"; + } + + private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { + private boolean hasDefaultValue; + private T defaultValue; + + SingletonCombine(boolean hasDefaultValue, T defaultValue) { + this.hasDefaultValue = hasDefaultValue; + this.defaultValue = defaultValue; + } + + @Override + public T apply(T left, T right) { + throw new IllegalArgumentException("PCollection with more than one element " + + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " + + "combine the PCollection into a single value"); + } + + @Override + public T identity() { + if (hasDefaultValue) { + return defaultValue; + } else { + throw new IllegalArgumentException( + "Empty PCollection accessed as a singleton view. " + + "Consider setting withDefault to provide a default value"); + } + } + } + } + + static class StreamingCombineGloballyAsSingletonView<InputT, OutputT> + extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { + Combine.GloballyAsSingletonView<InputT, OutputT> transform; + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingCombineGloballyAsSingletonView( + FlinkRunner runner, + Combine.GloballyAsSingletonView<InputT, OutputT> transform) { + this.transform = transform; + } + + @Override + public PCollectionView<OutputT> expand(PCollection<InputT> input) { + PCollection<OutputT> combined = + input.apply(Combine.globally(transform.getCombineFn()) + .withoutDefaults() + .withFanout(transform.getFanout())); + + PCollectionView<OutputT> view = PCollectionViews.singletonView( + combined, + combined.getWindowingStrategy(), + transform.getInsertDefault(), + transform.getInsertDefault() + ? transform.getCombineFn().defaultValue() : null, + combined.getCoder()); + return combined + .apply(ParDo.of(new WrapAsList<OutputT>())) + .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingCombineGloballyAsSingletonView"; + } + } + + private static class WrapAsList<T> extends DoFn<T, List<T>> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(Collections.singletonList(c.element())); + } + } + + /** + * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. + * + * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap}, + * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}. + * They require the input {@link PCollection} fits in memory. + * For a large {@link PCollection} this is expected to crash! + * + * @param <T> the type of elements to concatenate. + */ + private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { + @Override + public List<T> createAccumulator() { + return new ArrayList<T>(); + } + + @Override + public List<T> addInput(List<T> accumulator, T input) { + accumulator.add(input); + return accumulator; + } + + @Override + public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { + List<T> result = createAccumulator(); + for (List<T> accumulator : accumulators) { + result.addAll(accumulator); + } + return result; + } + + @Override + public List<T> extractOutput(List<T> accumulator) { + return accumulator; + } + + @Override + public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + + @Override + public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + } + + /** + * Creates a primitive {@link PCollectionView}. + * + * <p>For internal use only by runner implementors. + * + * @param <ElemT> The type of the elements of the input PCollection + * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input + */ + public static class CreateFlinkPCollectionView<ElemT, ViewT> + extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { + private PCollectionView<ViewT> view; + + private CreateFlinkPCollectionView(PCollectionView<ViewT> view) { + this.view = view; + } + + public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of( + PCollectionView<ViewT> view) { + return new CreateFlinkPCollectionView<>(view); + } + + @Override + public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { + return view; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java new file mode 100644 index 0000000..3acc3ea --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. + */ +class PipelineTranslationOptimizer extends FlinkPipelineTranslator { + + private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationOptimizer.class); + + private TranslationMode translationMode; + + private final FlinkPipelineOptions options; + + public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) { + this.translationMode = defaultMode; + this.options = options; + } + + public TranslationMode getTranslationMode() { + + // override user-specified translation mode + if (options.isStreaming()) { + return TranslationMode.STREAMING; + } + + return translationMode; + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformHierarchy.Node node) {} + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + Class<? extends PTransform> transformClass = node.getTransform().getClass(); + if (transformClass == Read.Unbounded.class) { + LOG.info("Found {}. Switching to streaming execution.", transformClass); + translationMode = TranslationMode.STREAMING; + } + } + + @Override + public void visitValue(PValue value, TransformHierarchy.Node producer) {} +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java new file mode 100644 index 0000000..8f50105 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.util.UserCodeException; + +/** + * Test Flink runner. + */ +public class TestFlinkRunner extends PipelineRunner<PipelineResult> { + + private FlinkRunner delegate; + + private TestFlinkRunner(FlinkPipelineOptions options) { + // We use [auto] for testing since this will make it pick up the Testing ExecutionEnvironment + options.setFlinkMaster("[auto]"); + this.delegate = FlinkRunner.fromOptions(options); + } + + public static TestFlinkRunner fromOptions(PipelineOptions options) { + FlinkPipelineOptions flinkOptions = + PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options); + return new TestFlinkRunner(flinkOptions); + } + + public static TestFlinkRunner create(boolean streaming) { + FlinkPipelineOptions flinkOptions = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + flinkOptions.setRunner(TestFlinkRunner.class); + flinkOptions.setStreaming(streaming); + return TestFlinkRunner.fromOptions(flinkOptions); + } + + @Override + public PipelineResult run(Pipeline pipeline) { + try { + return delegate.run(pipeline); + } catch (Throwable t) { + // Special case hack to pull out assertion errors from PAssert; instead there should + // probably be a better story along the lines of UserCodeException. + UserCodeException innermostUserCodeException = null; + Throwable current = t; + for (; current.getCause() != null; current = current.getCause()) { + if (current instanceof UserCodeException) { + innermostUserCodeException = ((UserCodeException) current); + } + } + if (innermostUserCodeException != null) { + current = innermostUserCodeException.getCause(); + } + if (current instanceof AssertionError) { + throw (AssertionError) current; + } + throw new PipelineExecutionException(current); + } + } + + public PipelineOptions getPipelineOptions() { + return delegate.getPipelineOptions(); + } +} + + http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/TranslationMode.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/TranslationMode.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/TranslationMode.java new file mode 100644 index 0000000..ad54750 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/TranslationMode.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +/** + * The translation mode of the Beam Pipeline. + */ +enum TranslationMode { + + /** Uses the batch mode of Flink. */ + BATCH, + + /** Uses the streaming mode of Flink. */ + STREAMING + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/package-info.java new file mode 100644 index 0000000..57f1e59 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java new file mode 100644 index 0000000..fb2493b --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.runners.core.AggregatorFactory; +import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.flink.api.common.functions.RuntimeContext; + +/** + * A {@link AggregatorFactory} for the Flink Batch Runner. + */ +public class FlinkAggregatorFactory implements AggregatorFactory{ + + private final RuntimeContext runtimeContext; + + public FlinkAggregatorFactory(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + } + + @Override + public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( + Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName, + Combine.CombineFn<InputT, AccumT, OutputT> combine) { + @SuppressWarnings("unchecked") + SerializableFnAggregatorWrapper<InputT, OutputT> result = + (SerializableFnAggregatorWrapper<InputT, OutputT>) + runtimeContext.getAccumulator(aggregatorName); + + if (result == null) { + result = new SerializableFnAggregatorWrapper<>(combine); + runtimeContext.addAccumulator(aggregatorName, result); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java new file mode 100644 index 0000000..447b1e5 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.Iterables; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.joda.time.Instant; + +/** + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext} for + * Flink functions. + */ +class FlinkAssignContext<InputT, W extends BoundedWindow> + extends WindowFn<InputT, W>.AssignContext { + private final WindowedValue<InputT> value; + + FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) { + fn.super(); + checkArgument( + Iterables.size(value.getWindows()) == 1, + String.format( + "%s passed to window assignment must be in a single window, but it was in %s: %s", + WindowedValue.class.getSimpleName(), + Iterables.size(value.getWindows()), + value.getWindows())); + this.value = value; + } + + @Override + public InputT element() { + return value.getValue(); + } + + @Override + public Instant timestamp() { + return value.getTimestamp(); + } + + @Override + public BoundedWindow window() { + return Iterables.getOnlyElement(value.getWindows()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java new file mode 100644 index 0000000..c3a5095 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import java.util.Collection; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; + +/** + * Flink {@link FlatMapFunction} for implementing + * {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}. + */ +public class FlinkAssignWindows<T, W extends BoundedWindow> + implements FlatMapFunction<WindowedValue<T>, WindowedValue<T>> { + + private final WindowFn<T, W> windowFn; + + public FlinkAssignWindows(WindowFn<T, W> windowFn) { + this.windowFn = windowFn; + } + + @Override + public void flatMap( + WindowedValue<T> input, Collector<WindowedValue<T>> collector) throws Exception { + Collection<W> windows = windowFn.assignWindows(new FlinkAssignContext<>(windowFn, input)); + for (W window: windows) { + collector.collect( + WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPane())); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java new file mode 100644 index 0000000..51582af --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import java.util.Collections; +import java.util.Map; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +/** + * Encapsulates a {@link DoFn} + * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}. + * + * <p>We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index + * and must tag all outputs with the output number. Afterwards a filter will filter out + * those elements that are not to be in a specific output. + */ +public class FlinkDoFnFunction<InputT, OutputT> + extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> { + + private final SerializedPipelineOptions serializedOptions; + + private final DoFn<InputT, OutputT> doFn; + private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; + + private final WindowingStrategy<?, ?> windowingStrategy; + + private final Map<TupleTag<?>, Integer> outputMap; + private final TupleTag<OutputT> mainOutputTag; + + private transient DoFnInvoker<InputT, OutputT> doFnInvoker; + + public FlinkDoFnFunction( + DoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions options, + Map<TupleTag<?>, Integer> outputMap, + TupleTag<OutputT> mainOutputTag) { + + this.doFn = doFn; + this.sideInputs = sideInputs; + this.serializedOptions = new SerializedPipelineOptions(options); + this.windowingStrategy = windowingStrategy; + this.outputMap = outputMap; + this.mainOutputTag = mainOutputTag; + + } + + @Override + public void mapPartition( + Iterable<WindowedValue<InputT>> values, + Collector<WindowedValue<OutputT>> out) throws Exception { + + RuntimeContext runtimeContext = getRuntimeContext(); + + DoFnRunners.OutputManager outputManager; + if (outputMap == null) { + outputManager = new FlinkDoFnFunction.DoFnOutputManager(out); + } else { + // it has some additional outputs + outputManager = + new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap); + } + + DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner( + serializedOptions.getPipelineOptions(), doFn, + new FlinkSideInputReader(sideInputs, runtimeContext), + outputManager, + mainOutputTag, + // see SimpleDoFnRunner, just use it to limit number of additional outputs + Collections.<TupleTag<?>>emptyList(), + new FlinkNoOpStepContext(), + new FlinkAggregatorFactory(runtimeContext), + windowingStrategy); + + doFnRunner.startBundle(); + + for (WindowedValue<InputT> value : values) { + doFnRunner.processElement(value); + } + + doFnRunner.finishBundle(); + } + + @Override + public void open(Configuration parameters) throws Exception { + doFnInvoker = DoFnInvokers.invokerFor(doFn); + doFnInvoker.invokeSetup(); + } + + @Override + public void close() throws Exception { + doFnInvoker.invokeTeardown(); + } + + static class DoFnOutputManager + implements DoFnRunners.OutputManager { + + private Collector collector; + + DoFnOutputManager(Collector collector) { + this.collector = collector; + } + + @Override + @SuppressWarnings("unchecked") + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + collector.collect(output); + } + } + + static class MultiDoFnOutputManager + implements DoFnRunners.OutputManager { + + private Collector<WindowedValue<RawUnionValue>> collector; + private Map<TupleTag<?>, Integer> outputMap; + + MultiDoFnOutputManager(Collector<WindowedValue<RawUnionValue>> collector, + Map<TupleTag<?>, Integer> outputMap) { + this.collector = collector; + this.outputMap = outputMap; + } + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + collector.collect(WindowedValue.of(new RawUnionValue(outputMap.get(tag), output.getValue()), + output.getTimestamp(), output.getWindows(), output.getPane())); + } + } + +}