http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java deleted file mode 100644 index cd5cd40..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.io.BoundedSource; -import com.google.cloud.dataflow.sdk.io.Source; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.io.InputSplitAssigner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.List; - -/** - * A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a - * Dataflow {@link com.google.cloud.dataflow.sdk.io.Source}. - */ -public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> { - private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); - - private final BoundedSource<T> initialSource; - private transient PipelineOptions options; - - private BoundedSource.BoundedReader<T> reader = null; - private boolean reachedEnd = true; - - public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) { - this.initialSource = initialSource; - this.options = options; - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, options); - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - options = mapper.readValue(in, PipelineOptions.class); - } - - @Override - public void configure(Configuration configuration) {} - - @Override - public void open(SourceInputSplit<T> sourceInputSplit) throws IOException { - reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options); - reachedEnd = false; - } - - @Override - public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException { - try { - final long estimatedSize = initialSource.getEstimatedSizeBytes(options); - - return new BaseStatistics() { - @Override - public long getTotalInputSize() { - return estimatedSize; - - } - - @Override - public long getNumberOfRecords() { - return BaseStatistics.NUM_RECORDS_UNKNOWN; - } - - @Override - public float getAverageRecordWidth() { - return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN; - } - }; - } catch (Exception e) { - LOG.warn("Could not read Source statistics: {}", e); - } - - return null; - } - - @Override - @SuppressWarnings("unchecked") - public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException { - long desiredSizeBytes; - try { - desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; - List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes, - options); - List<SourceInputSplit<T>> splits = new ArrayList<>(); - int splitCount = 0; - for (Source<T> shard: shards) { - splits.add(new SourceInputSplit<>(shard, splitCount++)); - } - return splits.toArray(new SourceInputSplit[splits.size()]); - } catch (Exception e) { - throw new IOException("Could not create input splits from Source.", e); - } - } - - @Override - public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) { - return new InputSplitAssigner() { - private int index = 0; - private final SourceInputSplit[] splits = sourceInputSplits; - @Override - public InputSplit getNextInputSplit(String host, int taskId) { - if (index < splits.length) { - return splits[index++]; - } else { - return null; - } - } - }; - } - - - @Override - public boolean reachedEnd() throws IOException { - return reachedEnd; - } - - @Override - public T nextRecord(T t) throws IOException { - - reachedEnd = !reader.advance(); - if (!reachedEnd) { - return reader.getCurrent(); - } - return null; - } - - @Override - public void close() throws IOException { - reader.close(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java deleted file mode 100644 index cde2b35..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import com.google.cloud.dataflow.sdk.io.Source; -import org.apache.flink.core.io.InputSplit; - -/** - * {@link org.apache.flink.core.io.InputSplit} for - * {@link org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}. We pass - * the sharded Source around in the input split because Sources simply split up into several - * Sources for sharding. This is different to how Flink creates a separate InputSplit from - * an InputFormat. - */ -public class SourceInputSplit<T> implements InputSplit { - - private Source<T> source; - private int splitNumber; - - public SourceInputSplit() { - } - - public SourceInputSplit(Source<T> source, int splitNumber) { - this.source = source; - this.splitNumber = splitNumber; - } - - @Override - public int getSplitNumber() { - return splitNumber; - } - - public Source<T> getSource() { - return source; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java deleted file mode 100644 index 10c8bbf..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.common.base.Preconditions; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; -import com.google.cloud.dataflow.sdk.util.*; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.base.Throwables; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; -import org.joda.time.format.PeriodFormat; - -import java.util.Collection; - -/** - * An abstract class that encapsulates the common code of the the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound} - * and {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} wrappers. See the {@link FlinkParDoBoundWrapper} and - * {@link FlinkParDoBoundMultiWrapper} for the actual wrappers of the aforementioned transformations. - * */ -public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> { - - private final DoFn<IN, OUTDF> doFn; - private final WindowingStrategy<?, ?> windowingStrategy; - private transient PipelineOptions options; - - private DoFnProcessContext context; - - public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) { - Preconditions.checkNotNull(options); - Preconditions.checkNotNull(windowingStrategy); - Preconditions.checkNotNull(doFn); - - this.doFn = doFn; - this.options = options; - this.windowingStrategy = windowingStrategy; - } - - private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) { - if (this.context == null) { - this.context = new DoFnProcessContext(function, outCollector); - } - } - - @Override - public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>> out) throws Exception { - this.initContext(doFn, out); - - // for each window the element belongs to, create a new copy here. - Collection<? extends BoundedWindow> windows = value.getWindows(); - if (windows.size() <= 1) { - processElement(value); - } else { - for (BoundedWindow window : windows) { - processElement(WindowedValue.of( - value.getValue(), value.getTimestamp(), window, value.getPane())); - } - } - } - - private void processElement(WindowedValue<IN> value) throws Exception { - this.context.setElement(value); - this.doFn.startBundle(context); - doFn.processElement(context); - this.doFn.finishBundle(context); - } - - private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext { - - private final DoFn<IN, OUTDF> fn; - - protected final Collector<WindowedValue<OUTFL>> collector; - - private WindowedValue<IN> element; - - private DoFnProcessContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) { - function.super(); - super.setupDelegateAggregators(); - - this.fn = function; - this.collector = outCollector; - } - - public void setElement(WindowedValue<IN> value) { - this.element = value; - } - - @Override - public IN element() { - return this.element.getValue(); - } - - @Override - public Instant timestamp() { - return this.element.getTimestamp(); - } - - @Override - public BoundedWindow window() { - if (!(fn instanceof DoFn.RequiresWindowAccess)) { - throw new UnsupportedOperationException( - "window() is only available in the context of a DoFn marked as RequiresWindow."); - } - - Collection<? extends BoundedWindow> windows = this.element.getWindows(); - if (windows.size() != 1) { - throw new IllegalArgumentException("Each element is expected to belong to 1 window. " + - "This belongs to " + windows.size() + "."); - } - return windows.iterator().next(); - } - - @Override - public PaneInfo pane() { - return this.element.getPane(); - } - - @Override - public WindowingInternals<IN, OUTDF> windowingInternals() { - return windowingInternalsHelper(element, collector); - } - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - throw new RuntimeException("sideInput() is not supported in Streaming mode."); - } - - @Override - public void output(OUTDF output) { - outputWithTimestamp(output, this.element.getTimestamp()); - } - - @Override - public void outputWithTimestamp(OUTDF output, Instant timestamp) { - outputWithTimestampHelper(element, output, timestamp, collector); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - sideOutputWithTimestamp(tag, output, this.element.getTimestamp()); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - sideOutputWithTimestampHelper(element, output, timestamp, collector, tag); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - Accumulator acc = getRuntimeContext().getAccumulator(name); - if (acc != null) { - AccumulatorHelper.compareAccumulatorTypes(name, - SerializableFnAggregatorWrapper.class, acc.getClass()); - return (Aggregator<AggInputT, AggOutputT>) acc; - } - - SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator = - new SerializableFnAggregatorWrapper<>(combiner); - getRuntimeContext().addAccumulator(name, accumulator); - return accumulator; - } - } - - protected void checkTimestamp(WindowedValue<IN> ref, Instant timestamp) { - if (timestamp.isBefore(ref.getTimestamp().minus(doFn.getAllowedTimestampSkew()))) { - throw new IllegalArgumentException(String.format( - "Cannot output with timestamp %s. Output timestamps must be no earlier than the " - + "timestamp of the current input (%s) minus the allowed skew (%s). See the " - + "DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.", - timestamp, ref.getTimestamp(), - PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod()))); - } - } - - protected <T> WindowedValue<T> makeWindowedValue( - T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - final Instant inputTimestamp = timestamp; - final WindowFn windowFn = windowingStrategy.getWindowFn(); - - if (timestamp == null) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - if (windows == null) { - try { - windows = windowFn.assignWindows(windowFn.new AssignContext() { - @Override - public Object element() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input element when none was available"); - } - - @Override - public Instant timestamp() { - if (inputTimestamp == null) { - throw new UnsupportedOperationException( - "WindowFn attempted to access input timestamp when none was available"); - } - return inputTimestamp; - } - - @Override - public Collection<? extends BoundedWindow> windows() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input windows when none were available"); - } - }); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - - return WindowedValue.of(output, timestamp, windows, pane); - } - - /////////// ABSTRACT METHODS TO BE IMPLEMENTED BY SUBCLASSES ///////////////// - - public abstract void outputWithTimestampHelper( - WindowedValue<IN> inElement, - OUTDF output, - Instant timestamp, - Collector<WindowedValue<OUTFL>> outCollector); - - public abstract <T> void sideOutputWithTimestampHelper( - WindowedValue<IN> inElement, - T output, - Instant timestamp, - Collector<WindowedValue<OUTFL>> outCollector, - TupleTag<T> tag); - - public abstract WindowingInternals<IN, OUTDF> windowingInternalsHelper( - WindowedValue<IN> inElement, - Collector<WindowedValue<OUTFL>> outCollector); - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java deleted file mode 100644 index e115a15..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ /dev/null @@ -1,631 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import org.apache.beam.runners.flink.translation.wrappers.streaming.state.*; -import com.google.cloud.dataflow.sdk.coders.*; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.util.*; -import com.google.cloud.dataflow.sdk.values.*; -import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.KeyedStream; -import org.apache.flink.streaming.api.operators.*; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTaskState; -import org.joda.time.Instant; - -import java.io.IOException; -import java.util.*; - -/** - * This class is the key class implementing all the windowing/triggering logic of Apache Beam. - * To provide full compatibility and support for all the windowing/triggering combinations offered by - * Beam, we opted for a strategy that uses the SDK's code for doing these operations. See the code in - * ({@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}. - * <p/> - * In a nutshell, when the execution arrives to this operator, we expect to have a stream <b>already - * grouped by key</b>. Each of the elements that enter here, registers a timer - * (see {@link TimerInternals#setTimer(TimerInternals.TimerData)} in the - * {@link FlinkGroupAlsoByWindowWrapper#activeTimers}. - * This is essentially a timestamp indicating when to trigger the computation over the window this - * element belongs to. - * <p/> - * When a watermark arrives, all the registered timers are checked to see which ones are ready to - * fire (see {@link FlinkGroupAlsoByWindowWrapper#processWatermark(Watermark)}). These are deregistered from - * the {@link FlinkGroupAlsoByWindowWrapper#activeTimers} - * list, and are fed into the {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn} - * for furhter processing. - */ -public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> - extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>> - implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, WindowedValue<KV<K, VOUT>>> { - - private static final long serialVersionUID = 1L; - - private transient PipelineOptions options; - - private transient CoderRegistry coderRegistry; - - private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator; - - private ProcessContext context; - - private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy; - - private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn; - - private final KvCoder<K, VIN> inputKvCoder; - - /** - * State is kept <b>per-key</b>. This data structure keeps this mapping between an active key, i.e. a - * key whose elements are currently waiting to be processed, and its associated state. - */ - private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(); - - /** - * Timers waiting to be processed. - */ - private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); - - private FlinkTimerInternals timerInternals = new FlinkTimerInternals(); - - /** - * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy. - * This method assumes that <b>elements are already grouped by key</b>. - * <p/> - * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)} - * is that this method assumes that a combiner function is provided - * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}). - * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state. - * - * @param options the general job configuration options. - * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}. - * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key. - * @param combiner the combiner to be used. - * @param outputKvCoder the type of the output values. - */ - public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> create( - PipelineOptions options, - PCollection input, - KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey, - Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner, - KvCoder<K, VOUT> outputKvCoder) { - Preconditions.checkNotNull(options); - - KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder(); - FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options, - input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, combiner); - - Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of( - outputKvCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); - - CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo = - new CoderTypeInformation<>(windowedOutputElemCoder); - - DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = groupedStreamByKey - .transform("GroupByWindowWithCombiner", - new CoderTypeInformation<>(outputKvCoder), - windower) - .returns(outputTypeInfo); - - return groupedByKeyAndWindow; - } - - /** - * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy. - * This method assumes that <b>elements are already grouped by key</b>. - * <p/> - * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)} - * is that this method assumes no combiner function - * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}). - * - * @param options the general job configuration options. - * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}. - * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key. - */ - public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable( - PipelineOptions options, - PCollection input, - KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) { - Preconditions.checkNotNull(options); - - KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder(); - Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder(); - - FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper(options, - input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, null); - - Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder); - KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, valueIterCoder); - - Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of( - outputElemCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); - - CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo = - new CoderTypeInformation<>(windowedOutputElemCoder); - - DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = groupedStreamByKey - .transform("GroupByWindow", - new CoderTypeInformation<>(windowedOutputElemCoder), - windower) - .returns(outputTypeInfo); - - return groupedByKeyAndWindow; - } - - public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper - createForTesting(PipelineOptions options, - CoderRegistry registry, - WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy, - KvCoder<K, VIN> inputCoder, - Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) { - Preconditions.checkNotNull(options); - - return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner); - } - - private FlinkGroupAlsoByWindowWrapper(PipelineOptions options, - CoderRegistry registry, - WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy, - KvCoder<K, VIN> inputCoder, - Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) { - Preconditions.checkNotNull(options); - - this.options = Preconditions.checkNotNull(options); - this.coderRegistry = Preconditions.checkNotNull(registry); - this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder(); - this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy(); - this.combineFn = combiner; - this.operator = createGroupAlsoByWindowOperator(); - this.chainingStrategy = ChainingStrategy.ALWAYS; - } - - @Override - public void open() throws Exception { - super.open(); - this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals); - } - - /** - * Create the adequate {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}, - * <b> if not already created</b>. - * If a {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, then - * a function with that combiner is created, so that elements are combined as they arrive. This is - * done for speed and (in most of the cases) for reduction of the per-window state. - */ - private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() { - if (this.operator == null) { - if (this.combineFn == null) { - // Thus VOUT == Iterable<VIN> - Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder(); - - this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create( - (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder)); - } else { - Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder(); - - AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn - .withInputCoder(combineFn, coderRegistry, inputKvCoder); - - this.operator = GroupAlsoByWindowViaWindowSetDoFn.create( - (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn)); - } - } - return this.operator; - } - - private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception { - context.setElement(workItem, getStateInternalsForKey(workItem.key())); - - // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded. - operator.startBundle(context); - operator.processElement(context); - operator.finishBundle(context); - } - - @Override - public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception { - ArrayList<WindowedValue<VIN>> elements = new ArrayList<>(); - elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(), - element.getValue().getWindows(), element.getValue().getPane())); - processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements)); - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - context.setCurrentInputWatermark(new Instant(mark.getTimestamp())); - - Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp()); - if (!timers.isEmpty()) { - for (K key : timers.keySet()) { - processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, timers.get(key))); - } - } - - /** - * This is to take into account the different semantics of the Watermark in Flink and - * in Dataflow. To understand the reasoning behind the Dataflow semantics and its - * watermark holding logic, see the documentation of - * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)} - * */ - long millis = Long.MAX_VALUE; - for (FlinkStateInternals state : perKeyStateInternals.values()) { - Instant watermarkHold = state.getWatermarkHold(); - if (watermarkHold != null && watermarkHold.getMillis() < millis) { - millis = watermarkHold.getMillis(); - } - } - - if (mark.getTimestamp() < millis) { - millis = mark.getTimestamp(); - } - - context.setCurrentOutputWatermark(new Instant(millis)); - - // Don't forget to re-emit the watermark for further operators down the line. - // This is critical for jobs with multiple aggregation steps. - // Imagine a job with a groupByKey() on key K1, followed by a map() that changes - // the key K1 to K2, and another groupByKey() on K2. In this case, if the watermark - // is not re-emitted, the second aggregation would never be triggered, and no result - // will be produced. - output.emitWatermark(new Watermark(millis)); - } - - @Override - public void close() throws Exception { - super.close(); - } - - private void registerActiveTimer(K key, TimerInternals.TimerData timer) { - Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); - if (timersForKey == null) { - timersForKey = new HashSet<>(); - } - timersForKey.add(timer); - activeTimers.put(key, timersForKey); - } - - private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) { - Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); - if (timersForKey != null) { - timersForKey.remove(timer); - if (timersForKey.isEmpty()) { - activeTimers.remove(key); - } else { - activeTimers.put(key, timersForKey); - } - } - } - - /** - * Returns the list of timers that are ready to fire. These are the timers - * that are registered to be triggered at a time before the current watermark. - * We keep these timers in a Set, so that they are deduplicated, as the same - * timer can be registered multiple times. - */ - private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) { - - // we keep the timers to return in a different list and launch them later - // because we cannot prevent a trigger from registering another trigger, - // which would lead to concurrent modification exception. - Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create(); - - Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); - - Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator(); - while (timerIt.hasNext()) { - TimerInternals.TimerData timerData = timerIt.next(); - if (timerData.getTimestamp().isBefore(currentWatermark)) { - toFire.put(keyWithTimers.getKey(), timerData); - timerIt.remove(); - } - } - - if (keyWithTimers.getValue().isEmpty()) { - it.remove(); - } - } - return toFire; - } - - /** - * Gets the state associated with the specified key. - * - * @param key the key whose state we want. - * @return The {@link FlinkStateInternals} - * associated with that key. - */ - private FlinkStateInternals<K> getStateInternalsForKey(K key) { - FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key); - if (stateInternals == null) { - Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); - OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn(); - stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn); - perKeyStateInternals.put(key, stateInternals); - } - return stateInternals; - } - - private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, VIN> { - @Override - public void setTimer(TimerData timerKey) { - registerActiveTimer(context.element().key(), timerKey); - } - - @Override - public void deleteTimer(TimerData timerKey) { - unregisterActiveTimer(context.element().key(), timerKey); - } - } - - private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext { - - private final FlinkTimerInternals timerInternals; - - private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector; - - private FlinkStateInternals<K> stateInternals; - - private KeyedWorkItem<K, VIN> element; - - public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function, - TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector, - FlinkTimerInternals timerInternals) { - function.super(); - super.setupDelegateAggregators(); - - this.collector = Preconditions.checkNotNull(outCollector); - this.timerInternals = Preconditions.checkNotNull(timerInternals); - } - - public void setElement(KeyedWorkItem<K, VIN> element, - FlinkStateInternals<K> stateForKey) { - this.element = element; - this.stateInternals = stateForKey; - } - - public void setCurrentInputWatermark(Instant watermark) { - this.timerInternals.setCurrentInputWatermark(watermark); - } - - public void setCurrentOutputWatermark(Instant watermark) { - this.timerInternals.setCurrentOutputWatermark(watermark); - } - - @Override - public KeyedWorkItem<K, VIN> element() { - return this.element; - } - - @Override - public Instant timestamp() { - throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems."); - } - - @Override - public PipelineOptions getPipelineOptions() { - // TODO: PipelineOptions need to be available on the workers. - // Ideally they are captured as part of the pipeline. - // For now, construct empty options so that StateContexts.createFromComponents - // will yield a valid StateContext, which is needed to support the StateContext.window(). - if (options == null) { - options = new PipelineOptions() { - @Override - public <T extends PipelineOptions> T as(Class<T> kls) { - return null; - } - - @Override - public <T extends PipelineOptions> T cloneAs(Class<T> kls) { - return null; - } - - @Override - public Class<? extends PipelineRunner<?>> getRunner() { - return null; - } - - @Override - public void setRunner(Class<? extends PipelineRunner<?>> kls) { - - } - - @Override - public CheckEnabled getStableUniqueNames() { - return null; - } - - @Override - public void setStableUniqueNames(CheckEnabled enabled) { - } - }; - } - return options; - } - - @Override - public void output(KV<K, VOUT> output) { - throw new UnsupportedOperationException( - "output() is not available when processing KeyedWorkItems."); - } - - @Override - public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) { - throw new UnsupportedOperationException( - "outputWithTimestamp() is not available when processing KeyedWorkItems."); - } - - @Override - public PaneInfo pane() { - throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems."); - } - - @Override - public BoundedWindow window() { - throw new UnsupportedOperationException( - "window() is not available when processing KeyedWorkItems."); - } - - @Override - public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> windowingInternals() { - return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() { - - @Override - public com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() { - return stateInternals; - } - - @Override - public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - // TODO: No need to represent timestamp twice. - collector.setAbsoluteTimestamp(timestamp.getMillis()); - collector.collect(WindowedValue.of(output, timestamp, windows, pane)); - - } - - @Override - public TimerInternals timerInternals() { - return timerInternals; - } - - @Override - public Collection<? extends BoundedWindow> windows() { - throw new UnsupportedOperationException("windows() is not available in Streaming mode."); - } - - @Override - public PaneInfo pane() { - throw new UnsupportedOperationException("pane() is not available in Streaming mode."); - } - - @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { - throw new RuntimeException("writePCollectionViewData() not available in Streaming mode."); - } - - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - throw new RuntimeException("sideInput() is not available in Streaming mode."); - } - }; - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - throw new RuntimeException("sideInput() is not supported in Streaming mode."); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - // ignore the side output, this can happen when a user does not register - // side outputs but then outputs using a freshly created TupleTag. - throw new RuntimeException("sideOutput() is not available when grouping by window."); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - sideOutput(tag, output); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - Accumulator acc = getRuntimeContext().getAccumulator(name); - if (acc != null) { - AccumulatorHelper.compareAccumulatorTypes(name, - SerializableFnAggregatorWrapper.class, acc.getClass()); - return (Aggregator<AggInputT, AggOutputT>) acc; - } - - SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator = - new SerializableFnAggregatorWrapper<>(combiner); - getRuntimeContext().addAccumulator(name, accumulator); - return accumulator; - } - } - - ////////////// Checkpointing implementation //////////////// - - @Override - public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { - StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); - AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); - StateCheckpointWriter writer = StateCheckpointWriter.create(out); - Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - - // checkpoint the timers - StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder); - - // checkpoint the state - StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder); - - // checkpoint the timerInternals - context.timerInternals.encodeTimerInternals(context, writer, - inputKvCoder, windowingStrategy.getWindowFn().windowCoder()); - - taskState.setOperatorState(out.closeAndGetHandle()); - return taskState; - } - - @Override - public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception { - super.restoreState(taskState, recoveryTimestamp); - - final ClassLoader userClassloader = getUserCodeClassloader(); - - Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); - Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - - @SuppressWarnings("unchecked") - StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState(); - DataInputView in = inputState.getState(userClassloader); - StateCheckpointReader reader = new StateCheckpointReader(in); - - // restore the timers - this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder); - - // restore the state - this.perKeyStateInternals = StateCheckpointUtils.decodeState( - reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, userClassloader); - - // restore the timerInternals. - this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java deleted file mode 100644 index 1a6a665..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.KV; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.KeyedStream; - -/** - * This class groups the elements by key. It assumes that already the incoming stream - * is composed of <code>[Key,Value]</code> pairs. - * */ -public class FlinkGroupByKeyWrapper { - - /** - * Just an auxiliary interface to bypass the fact that java anonymous classes cannot implement - * multiple interfaces. - */ - private interface KeySelectorWithQueryableResultType<K, V> extends KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> { - } - - public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) { - final Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder); - final boolean isKeyVoid = keyCoder instanceof VoidCoder; - - return inputDataStream.keyBy( - new KeySelectorWithQueryableResultType<K, V>() { - - @Override - public K getKey(WindowedValue<KV<K, V>> value) throws Exception { - return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE : - value.getValue().getKey(); - } - - @Override - public TypeInformation<K> getProducedType() { - return keyTypeInfo; - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java deleted file mode 100644 index df7f953..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowingInternals; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.base.Preconditions; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -import java.util.Map; - -/** - * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} Beam transformation. - * */ -public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> { - - private final TupleTag<?> mainTag; - private final Map<TupleTag<?>, Integer> outputLabels; - - public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) { - super(options, windowingStrategy, doFn); - this.mainTag = Preconditions.checkNotNull(mainTag); - this.outputLabels = Preconditions.checkNotNull(tagsToLabels); - } - - @Override - public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) { - checkTimestamp(inElement, timestamp); - Integer index = outputLabels.get(mainTag); - collector.collect(makeWindowedValue( - new RawUnionValue(index, output), - timestamp, - inElement.getWindows(), - inElement.getPane())); - } - - @Override - public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, TupleTag<T> tag) { - checkTimestamp(inElement, timestamp); - Integer index = outputLabels.get(tag); - if (index != null) { - collector.collect(makeWindowedValue( - new RawUnionValue(index, output), - timestamp, - inElement.getWindows(), - inElement.getPane())); - } - } - - @Override - public WindowingInternals<IN, OUT> windowingInternalsHelper(WindowedValue<IN> inElement, Collector<WindowedValue<RawUnionValue>> outCollector) { - throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an internal operator serving as " + - "an intermediate transformation for the ParDo.BoundMulti translation. windowingInternals() " + - "is not available in this class."); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java deleted file mode 100644 index 2ed5620..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.util.*; -import com.google.cloud.dataflow.sdk.util.state.StateInternals; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -import java.io.IOException; -import java.util.*; - -/** - * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound} Beam transformation. - * */ -public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> { - - public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) { - super(options, windowingStrategy, doFn); - } - - @Override - public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<OUT>> collector) { - checkTimestamp(inElement, timestamp); - collector.collect(makeWindowedValue( - output, - timestamp, - inElement.getWindows(), - inElement.getPane())); - } - - @Override - public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> outCollector, TupleTag<T> tag) { - // ignore the side output, this can happen when a user does not register - // side outputs but then outputs using a freshly created TupleTag. - throw new RuntimeException("sideOutput() not not available in ParDo.Bound()."); - } - - @Override - public WindowingInternals<IN, OUT> windowingInternalsHelper(final WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) { - return new WindowingInternals<IN, OUT>() { - @Override - public StateInternals stateInternals() { - throw new NullPointerException("StateInternals are not available for ParDo.Bound()."); - } - - @Override - public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - collector.collect(makeWindowedValue(output, timestamp, windows, pane)); - } - - @Override - public TimerInternals timerInternals() { - throw new NullPointerException("TimeInternals are not available for ParDo.Bound()."); - } - - @Override - public Collection<? extends BoundedWindow> windows() { - return inElement.getWindows(); - } - - @Override - public PaneInfo pane() { - return inElement.getPane(); - } - - @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { - throw new RuntimeException("writePCollectionViewData() not supported in Streaming mode."); - } - - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - throw new RuntimeException("sideInput() not implemented."); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java deleted file mode 100644 index f6c243f..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io; - -import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.streaming.api.operators.TimestampedCollector; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -import java.io.ByteArrayInputStream; -import java.util.List; - -/** - * This flat map function bootstraps from collection elements and turns them into WindowedValues - * (as required by the Flink runner). - */ -public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN, WindowedValue<OUT>> { - - private final List<byte[]> elements; - private final Coder<OUT> coder; - - public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) { - this.elements = elements; - this.coder = coder; - } - - @Override - public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception { - - @SuppressWarnings("unchecked") - OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE; - for (byte[] element : elements) { - ByteArrayInputStream bai = new ByteArrayInputStream(element); - OUT outValue = coder.decode(bai, Coder.Context.OUTER); - - if (outValue == null) { - out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); - } else { - out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); - } - } - - out.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java deleted file mode 100644 index 2857efd..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io; - -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.io.UnboundedSource; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.common.base.Preconditions; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; - -import javax.annotation.Nullable; -import java.util.List; - -/** - * A wrapper translating Flink Sources implementing the {@link RichParallelSourceFunction} interface, into - * unbounded Beam sources (see {@link UnboundedSource}). - * */ -public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> { - - private final PipelineOptions options; - private final RichParallelSourceFunction<T> flinkSource; - - public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) { - if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } - options = Preconditions.checkNotNull(pipelineOptions); - flinkSource = Preconditions.checkNotNull(source); - validate(); - } - - public RichParallelSourceFunction<T> getFlinkSource() { - return this.flinkSource; - } - - @Override - public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } - - @Override - public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } - - @Nullable - @Override - public Coder<C> getCheckpointMarkCoder() { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } - - - @Override - public void validate() { - Preconditions.checkNotNull(options); - Preconditions.checkNotNull(flinkSource); - if(!options.getRunner().equals(FlinkPipelineRunner.class)) { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } - } - - @Override - public Coder<T> getDefaultOutputCoder() { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java deleted file mode 100644 index 1389e9d..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.io.UnboundedSource; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Serializable; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.Collections; -import java.util.List; -import java.util.NoSuchElementException; - -import static com.google.common.base.Preconditions.checkArgument; - -/** - * An example unbounded Beam source that reads input from a socket. This is used mainly for testing and debugging. - * */ -public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, C> { - - private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of(); - - private static final long serialVersionUID = 1L; - - private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500; - - private static final int CONNECTION_TIMEOUT_TIME = 0; - - private final String hostname; - private final int port; - private final char delimiter; - private final long maxNumRetries; - private final long delayBetweenRetries; - - public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) { - this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP); - } - - public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) { - this.hostname = hostname; - this.port = port; - this.delimiter = delimiter; - this.maxNumRetries = maxNumRetries; - this.delayBetweenRetries = delayBetweenRetries; - } - - public String getHostname() { - return this.hostname; - } - - public int getPort() { - return this.port; - } - - public char getDelimiter() { - return this.delimiter; - } - - public long getMaxNumRetries() { - return this.maxNumRetries; - } - - public long getDelayBetweenRetries() { - return this.delayBetweenRetries; - } - - @Override - public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { - return Collections.<UnboundedSource<String, C>>singletonList(this); - } - - @Override - public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) { - return new UnboundedSocketReader(this); - } - - @Nullable - @Override - public Coder getCheckpointMarkCoder() { - // Flink and Dataflow have different checkpointing mechanisms. - // In our case we do not need a coder. - return null; - } - - @Override - public void validate() { - checkArgument(port > 0 && port < 65536, "port is out of range"); - checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)"); - checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive"); - } - - @Override - public Coder getDefaultOutputCoder() { - return DEFAULT_SOCKET_CODER; - } - - public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable { - - private static final long serialVersionUID = 7526472295622776147L; - private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class); - - private final UnboundedSocketSource source; - - private Socket socket; - private BufferedReader reader; - - private boolean isRunning; - - private String currentRecord; - - public UnboundedSocketReader(UnboundedSocketSource source) { - this.source = source; - } - - private void openConnection() throws IOException { - this.socket = new Socket(); - this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME); - this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); - this.isRunning = true; - } - - @Override - public boolean start() throws IOException { - int attempt = 0; - while (!isRunning) { - try { - openConnection(); - LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort()); - - return advance(); - } catch (IOException e) { - LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs..."); - - if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) { - try { - Thread.sleep(this.source.getDelayBetweenRetries()); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - } else { - this.isRunning = false; - break; - } - } - } - LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort()); - return false; - } - - @Override - public boolean advance() throws IOException { - final StringBuilder buffer = new StringBuilder(); - int data; - while (isRunning && (data = reader.read()) != -1) { - // check if the string is complete - if (data != this.source.getDelimiter()) { - buffer.append((char) data); - } else { - if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') { - buffer.setLength(buffer.length() - 1); - } - this.currentRecord = buffer.toString(); - buffer.setLength(0); - return true; - } - } - return false; - } - - @Override - public byte[] getCurrentRecordId() throws NoSuchElementException { - return new byte[0]; - } - - @Override - public String getCurrent() throws NoSuchElementException { - return this.currentRecord; - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return Instant.now(); - } - - @Override - public void close() throws IOException { - this.reader.close(); - this.socket.close(); - this.isRunning = false; - LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + "."); - } - - @Override - public Instant getWatermark() { - return Instant.now(); - } - - @Override - public CheckpointMark getCheckpointMark() { - return null; - } - - @Override - public UnboundedSource<String, ?> getCurrentSource() { - return this.source; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java deleted file mode 100644 index 97084cf..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.io; - -import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.io.UnboundedSource; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.joda.time.Instant; - -/** - * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded} - * interface. - * - *</p> - * For now we support non-parallel, not checkpointed sources. - * */ -public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable { - - private final String name; - private final UnboundedSource.UnboundedReader<T> reader; - - private StreamingRuntimeContext runtime = null; - private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null; - - private volatile boolean isRunning = false; - - public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) { - this.name = transform.getName(); - this.reader = transform.getSource().createReader(options, null); - } - - public String getName() { - return this.name; - } - - WindowedValue<T> makeWindowedValue(T output, Instant timestamp) { - if (timestamp == null) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); - } - - @Override - public void run(SourceContext<WindowedValue<T>> ctx) throws Exception { - if (!(ctx instanceof StreamSource.ManualWatermarkContext)) { - throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " + - "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source."); - } - - context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx; - runtime = (StreamingRuntimeContext) getRuntimeContext(); - - this.isRunning = true; - boolean inputAvailable = reader.start(); - - setNextWatermarkTimer(this.runtime); - - while (isRunning) { - - while (!inputAvailable && isRunning) { - // wait a bit until we retry to pull more records - Thread.sleep(50); - inputAvailable = reader.advance(); - } - - if (inputAvailable) { - - // get it and its timestamp from the source - T item = reader.getCurrent(); - Instant timestamp = reader.getCurrentTimestamp(); - - // write it to the output collector - synchronized (ctx.getCheckpointLock()) { - context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis()); - } - - inputAvailable = reader.advance(); - } - - } - } - - @Override - public void cancel() { - isRunning = false; - } - - @Override - public void trigger(long timestamp) throws Exception { - if (this.isRunning) { - synchronized (context.getCheckpointLock()) { - long watermarkMillis = this.reader.getWatermark().getMillis(); - context.emitWatermark(new Watermark(watermarkMillis)); - } - setNextWatermarkTimer(this.runtime); - } - } - - private void setNextWatermarkTimer(StreamingRuntimeContext runtime) { - if (this.isRunning) { - long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval(); - long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval); - runtime.registerTimer(timeToNextWatermark, this); - } - } - - private long getTimeToNextWaternark(long watermarkInterval) { - return System.currentTimeMillis() + watermarkInterval; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java deleted file mode 100644 index fc75948..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.util.TimerInternals; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.KV; -import org.joda.time.Instant; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.io.Serializable; - -/** - * An implementation of Beam's {@link TimerInternals}, that also provides serialization functionality. - * The latter is used when snapshots of the current state are taken, for fault-tolerance. - * */ -public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerInternals, Serializable { - private Instant currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; - private Instant currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; - - public void setCurrentInputWatermark(Instant watermark) { - checkIfValidInputWatermark(watermark); - this.currentInputWatermark = watermark; - } - - public void setCurrentOutputWatermark(Instant watermark) { - checkIfValidOutputWatermark(watermark); - this.currentOutputWatermark = watermark; - } - - private void setCurrentInputWatermarkAfterRecovery(Instant watermark) { - if (!currentInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) { - throw new RuntimeException("Explicitly setting the input watermark is only allowed on " + - "initialization after recovery from a node failure. Apparently this is not " + - "the case here as the watermark is already set."); - } - this.currentInputWatermark = watermark; - } - - private void setCurrentOutputWatermarkAfterRecovery(Instant watermark) { - if (!currentOutputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) { - throw new RuntimeException("Explicitly setting the output watermark is only allowed on " + - "initialization after recovery from a node failure. Apparently this is not " + - "the case here as the watermark is already set."); - } - this.currentOutputWatermark = watermark; - } - - @Override - public Instant currentProcessingTime() { - return Instant.now(); - } - - @Override - public Instant currentInputWatermarkTime() { - return currentInputWatermark; - } - - @Nullable - @Override - public Instant currentSynchronizedProcessingTime() { - // TODO - return null; - } - - @Override - public Instant currentOutputWatermarkTime() { - return currentOutputWatermark; - } - - private void checkIfValidInputWatermark(Instant newWatermark) { - if (currentInputWatermark.isAfter(newWatermark)) { - throw new IllegalArgumentException(String.format( - "Cannot set current input watermark to %s. Newer watermarks " + - "must be no earlier than the current one (%s).", - newWatermark, currentInputWatermark)); - } - } - - private void checkIfValidOutputWatermark(Instant newWatermark) { - if (currentOutputWatermark.isAfter(newWatermark)) { - throw new IllegalArgumentException(String.format( - "Cannot set current output watermark to %s. Newer watermarks " + - "must be no earlier than the current one (%s).", - newWatermark, currentOutputWatermark)); - } - } - - public void encodeTimerInternals(DoFn.ProcessContext context, - StateCheckpointWriter writer, - KvCoder<K, VIN> kvCoder, - Coder<? extends BoundedWindow> windowCoder) throws IOException { - if (context == null) { - throw new RuntimeException("The Context has not been initialized."); - } - - writer.setTimestamp(currentInputWatermark); - writer.setTimestamp(currentOutputWatermark); - } - - public void restoreTimerInternals(StateCheckpointReader reader, - KvCoder<K, VIN> kvCoder, - Coder<? extends BoundedWindow> windowCoder) throws IOException { - setCurrentInputWatermarkAfterRecovery(reader.getTimestamp()); - setCurrentOutputWatermarkAfterRecovery(reader.getTimestamp()); - } -}