http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java new file mode 100644 index 0000000..2efaad3 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; + +/** + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link Flatten} + * {@link PTransform}. + */ +class FlattenEvaluatorFactory implements TransformEvaluatorFactory { + @Override + public <InputT> TransformEvaluator<InputT> forApplication( + AppliedPTransform<?, ?, ?> application, + CommittedBundle<?> inputBundle, + InProcessEvaluationContext evaluationContext) { + @SuppressWarnings({"cast", "unchecked", "rawtypes"}) + TransformEvaluator<InputT> evaluator = (TransformEvaluator<InputT>) createInMemoryEvaluator( + (AppliedPTransform) application, inputBundle, evaluationContext); + return evaluator; + } + + private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator( + final AppliedPTransform< + PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>> + application, + final CommittedBundle<InputT> inputBundle, + final InProcessEvaluationContext evaluationContext) { + if (inputBundle == null) { + // it is impossible to call processElement on a flatten with no input bundle. A Flatten with + // no input bundle occurs as an output of Flatten.pcollections(PCollectionList.empty()) + return new FlattenEvaluator<>( + null, StepTransformResult.withoutHold(application).build()); + } + final UncommittedBundle<InputT> outputBundle = + evaluationContext.createBundle(inputBundle, application.getOutput()); + final InProcessTransformResult result = + StepTransformResult.withoutHold(application).addOutput(outputBundle).build(); + return new FlattenEvaluator<>(outputBundle, result); + } + + private static class FlattenEvaluator<InputT> implements TransformEvaluator<InputT> { + private final UncommittedBundle<InputT> outputBundle; + private final InProcessTransformResult result; + + public FlattenEvaluator( + UncommittedBundle<InputT> outputBundle, InProcessTransformResult result) { + this.outputBundle = outputBundle; + this.result = result; + } + + @Override + public void processElement(WindowedValue<InputT> element) { + outputBundle.add(element); + } + + @Override + public InProcessTransformResult finishBundle() { + return result; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java new file mode 100644 index 0000000..3160b58 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.TypedPValue; + +/** + * A base class for implementing {@link PTransform} overrides, which behave identically to the + * delegate transform but with overridden methods. Implementors are required to implement + * {@link #delegate()}, which returns the object to forward calls to, and {@link #apply(PInput)}. + */ +public abstract class ForwardingPTransform<InputT extends PInput, OutputT extends POutput> + extends PTransform<InputT, OutputT> { + protected abstract PTransform<InputT, OutputT> delegate(); + + @Override + public OutputT apply(InputT input) { + return delegate().apply(input); + } + + @Override + public void validate(InputT input) { + delegate().validate(input); + } + + @Override + public String getName() { + return delegate().getName(); + } + + @Override + public <T> Coder<T> getDefaultOutputCoder(InputT input, @SuppressWarnings("unused") + TypedPValue<T> output) throws CannotProvideCoderException { + return delegate().getDefaultOutputCoder(input, output); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + delegate().populateDisplayData(builder); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java new file mode 100644 index 0000000..874ec17 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.runners.direct.StepTransformResult.Builder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.GroupAlsoByWindowViaWindowSetDoFn; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItemCoder; +import org.apache.beam.sdk.util.KeyedWorkItems; +import org.apache.beam.sdk.util.SystemReduceFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link GroupByKey} + * {@link PTransform}. + */ +class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory { + @Override + public <InputT> TransformEvaluator<InputT> forApplication( + AppliedPTransform<?, ?, ?> application, + CommittedBundle<?> inputBundle, + InProcessEvaluationContext evaluationContext) { + @SuppressWarnings({"cast", "unchecked", "rawtypes"}) + TransformEvaluator<InputT> evaluator = createEvaluator( + (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext); + return evaluator; + } + + private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator( + final AppliedPTransform< + PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>, + InProcessGroupByKeyOnly<K, V>> + application, + final CommittedBundle<KV<K, V>> inputBundle, + final InProcessEvaluationContext evaluationContext) { + return new GroupByKeyEvaluator<K, V>(evaluationContext, inputBundle, application); + } + + private static class GroupByKeyEvaluator<K, V> + implements TransformEvaluator<KV<K, WindowedValue<V>>> { + private final InProcessEvaluationContext evaluationContext; + + private final CommittedBundle<KV<K, V>> inputBundle; + private final AppliedPTransform< + PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>, + InProcessGroupByKeyOnly<K, V>> + application; + private final Coder<K> keyCoder; + private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap; + + public GroupByKeyEvaluator( + InProcessEvaluationContext evaluationContext, + CommittedBundle<KV<K, V>> inputBundle, + AppliedPTransform< + PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>, + InProcessGroupByKeyOnly<K, V>> + application) { + this.evaluationContext = evaluationContext; + this.inputBundle = inputBundle; + this.application = application; + + PCollection<KV<K, WindowedValue<V>>> input = application.getInput(); + keyCoder = getKeyCoder(input.getCoder()); + groupingMap = new HashMap<>(); + } + + private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) { + if (!(coder instanceof KvCoder)) { + throw new IllegalStateException(); + } + @SuppressWarnings("unchecked") + Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder(); + return keyCoder; + } + + @Override + public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) { + KV<K, WindowedValue<V>> kv = element.getValue(); + K key = kv.getKey(); + byte[] encodedKey; + try { + encodedKey = encodeToByteArray(keyCoder, key); + } catch (CoderException exn) { + // TODO: Put in better element printing: + // truncate if too long. + throw new IllegalArgumentException( + String.format("unable to encode key %s of input to %s using %s", key, this, keyCoder), + exn); + } + GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey); + List<WindowedValue<V>> values = groupingMap.get(groupingKey); + if (values == null) { + values = new ArrayList<WindowedValue<V>>(); + groupingMap.put(groupingKey, values); + } + values.add(kv.getValue()); + } + + @Override + public InProcessTransformResult finishBundle() { + Builder resultBuilder = StepTransformResult.withoutHold(application); + for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry : + groupingMap.entrySet()) { + K key = groupedEntry.getKey().key; + KeyedWorkItem<K, V> groupedKv = + KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue()); + UncommittedBundle<KeyedWorkItem<K, V>> bundle = + evaluationContext.createKeyedBundle(inputBundle, key, application.getOutput()); + bundle.add(WindowedValue.valueInGlobalWindow(groupedKv)); + resultBuilder.addOutput(bundle); + } + return resultBuilder.build(); + } + + private static class GroupingKey<K> { + private K key; + private byte[] encodedKey; + + public GroupingKey(K key, byte[] encodedKey) { + this.key = key; + this.encodedKey = encodedKey; + } + + @Override + public boolean equals(Object o) { + if (o instanceof GroupingKey) { + GroupingKey<?> that = (GroupingKey<?>) o; + return Arrays.equals(this.encodedKey, that.encodedKey); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Arrays.hashCode(encodedKey); + } + } + } + + /** + * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. + */ + public static final class InProcessGroupByKeyOverrideFactory + implements PTransformOverrideFactory { + @Override + public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override( + PTransform<InputT, OutputT> transform) { + if (transform instanceof GroupByKey) { + @SuppressWarnings({"rawtypes", "unchecked"}) + PTransform<InputT, OutputT> override = new InProcessGroupByKey((GroupByKey) transform); + return override; + } + return transform; + } + } + + /** + * An in-memory implementation of the {@link GroupByKey} primitive as a composite + * {@link PTransform}. + */ + private static final class InProcessGroupByKey<K, V> + extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { + private final GroupByKey<K, V> original; + + private InProcessGroupByKey(GroupByKey<K, V> from) { + this.original = from; + } + + @Override + public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() { + return original; + } + + @Override + public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { + KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder(); + + // This operation groups by the combination of key and window, + // merging windows as needed, using the windows assigned to the + // key/value input elements and the window merge operation of the + // window function associated with the input PCollection. + WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); + + // Use the default GroupAlsoByWindow implementation + DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow = + groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder()); + + // By default, implement GroupByKey via a series of lower-level operations. + return input + // Make each input element's timestamp and assigned windows + // explicit, in the value part. + .apply(new ReifyTimestampsAndWindows<K, V>()) + + .apply(new InProcessGroupByKeyOnly<K, V>()) + .setCoder(KeyedWorkItemCoder.of(inputCoder.getKeyCoder(), + inputCoder.getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())) + + // Group each key's values by window, merging windows as needed. + .apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow)) + + // And update the windowing strategy as appropriate. + .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy)) + .setCoder( + KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()))); + } + + private <W extends BoundedWindow> + DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow( + final WindowingStrategy<?, W> windowingStrategy, final Coder<V> inputCoder) { + return GroupAlsoByWindowViaWindowSetDoFn.create( + windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder)); + } + } + + /** + * An implementation primitive to use in the evaluation of a {@link GroupByKey} + * {@link PTransform}. + */ + public static final class InProcessGroupByKeyOnly<K, V> + extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> { + @Override + public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) { + return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + } + + @VisibleForTesting + InProcessGroupByKeyOnly() {} + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java new file mode 100644 index 0000000..2103ad3 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.IllegalMutationException; +import org.apache.beam.sdk.util.MutationDetector; +import org.apache.beam.sdk.util.MutationDetectors; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; + +import com.google.api.client.util.Throwables; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; + +import org.joda.time.Instant; + +/** + * A {@link BundleFactory} that ensures that elements added to it are not mutated after being + * output. Immutability checks are enforced at the time {@link UncommittedBundle#commit(Instant)} is + * called, checking the value at that time against the value at the time the element was added. All + * elements added to the bundle will be encoded by the {@link Coder} of the underlying + * {@link PCollection}. + * + * <p>This catches errors during the execution of a {@link DoFn} caused by modifying an element + * after it is added to an output {@link PCollection}. + */ +class ImmutabilityCheckingBundleFactory implements BundleFactory { + /** + * Create a new {@link ImmutabilityCheckingBundleFactory} that uses the underlying + * {@link BundleFactory} to create the output bundle. + */ + public static ImmutabilityCheckingBundleFactory create(BundleFactory underlying) { + return new ImmutabilityCheckingBundleFactory(underlying); + } + + private final BundleFactory underlying; + + private ImmutabilityCheckingBundleFactory(BundleFactory underlying) { + this.underlying = checkNotNull(underlying); + } + + @Override + public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) { + return new ImmutabilityEnforcingBundle<>(underlying.createRootBundle(output)); + } + + @Override + public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) { + return new ImmutabilityEnforcingBundle<>(underlying.createBundle(input, output)); + } + + @Override + public <T> UncommittedBundle<T> createKeyedBundle( + CommittedBundle<?> input, Object key, PCollection<T> output) { + return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output)); + } + + private static class ImmutabilityEnforcingBundle<T> implements UncommittedBundle<T> { + private final UncommittedBundle<T> underlying; + private final SetMultimap<WindowedValue<T>, MutationDetector> mutationDetectors; + private Coder<T> coder; + + public ImmutabilityEnforcingBundle(UncommittedBundle<T> underlying) { + this.underlying = underlying; + mutationDetectors = HashMultimap.create(); + coder = getPCollection().getCoder(); + } + + @Override + public PCollection<T> getPCollection() { + return underlying.getPCollection(); + } + + @Override + public UncommittedBundle<T> add(WindowedValue<T> element) { + try { + mutationDetectors.put( + element, MutationDetectors.forValueWithCoder(element.getValue(), coder)); + } catch (CoderException e) { + throw Throwables.propagate(e); + } + underlying.add(element); + return this; + } + + @Override + public CommittedBundle<T> commit(Instant synchronizedProcessingTime) { + for (MutationDetector detector : mutationDetectors.values()) { + try { + detector.verifyUnmodified(); + } catch (IllegalMutationException exn) { + throw UserCodeException.wrap( + new IllegalMutationException( + String.format( + "PTransform %s mutated value %s after it was output (new value was %s)." + + " Values must not be mutated in any way after being output.", + underlying.getPCollection().getProducingTransformInternal().getFullName(), + exn.getSavedValue(), + exn.getNewValue()), + exn.getSavedValue(), + exn.getNewValue(), + exn)); + } + } + return underlying.commit(synchronizedProcessingTime); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java new file mode 100644 index 0000000..bfecc9d --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.util.IllegalMutationException; +import org.apache.beam.sdk.util.MutationDetector; +import org.apache.beam.sdk.util.MutationDetectors; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; + +import java.util.IdentityHashMap; +import java.util.Map; + +/** + * {@link ModelEnforcement} that enforces elements are not modified over the course of processing + * an element. + * + * <p>Implies {@link EncodabilityEnforcment}. + */ +class ImmutabilityEnforcementFactory implements ModelEnforcementFactory { + public static ModelEnforcementFactory create() { + return new ImmutabilityEnforcementFactory(); + } + + @Override + public <T> ModelEnforcement<T> forBundle( + CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) { + return new ImmutabilityCheckingEnforcement<T>(input, consumer); + } + + private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> { + private final AppliedPTransform<?, ?, ?> transform; + private final Map<WindowedValue<T>, MutationDetector> mutationElements; + private final Coder<T> coder; + + private ImmutabilityCheckingEnforcement( + CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) { + this.transform = transform; + coder = input.getPCollection().getCoder(); + mutationElements = new IdentityHashMap<>(); + } + + @Override + public void beforeElement(WindowedValue<T> element) { + try { + mutationElements.put( + element, MutationDetectors.forValueWithCoder(element.getValue(), coder)); + } catch (CoderException e) { + throw UserCodeException.wrap(e); + } + } + + @Override + public void afterElement(WindowedValue<T> element) { + verifyUnmodified(mutationElements.get(element)); + } + + @Override + public void afterFinish( + CommittedBundle<T> input, + InProcessTransformResult result, + Iterable<? extends CommittedBundle<?>> outputs) { + for (MutationDetector detector : mutationElements.values()) { + verifyUnmodified(detector); + } + } + + private void verifyUnmodified(MutationDetector detector) { + try { + detector.verifyUnmodified(); + } catch (IllegalMutationException e) { + throw new IllegalMutationException( + String.format( + "PTransform %s illegaly mutated value %s of class %s." + + " Input values must not be mutated in any way.", + transform.getFullName(), + e.getSavedValue(), + e.getSavedValue().getClass()), + e.getSavedValue(), + e.getNewValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java new file mode 100644 index 0000000..07b6bb4 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java @@ -0,0 +1,1327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import com.google.common.collect.SortedMultiset; +import com.google.common.collect.TreeMultiset; + +import org.joda.time.Instant; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.Nullable; + +/** + * Manages watermarks of {@link PCollection PCollections} and input and output watermarks of + * {@link AppliedPTransform AppliedPTransforms} to provide event-time and completion tracking for + * in-memory execution. {@link InMemoryWatermarkManager} is designed to update and return a + * consistent view of watermarks in the presence of concurrent updates. + * + * <p>An {@link InMemoryWatermarkManager} is provided with the collection of root + * {@link AppliedPTransform AppliedPTransforms} and a map of {@link PCollection PCollections} to + * all the {@link AppliedPTransform AppliedPTransforms} that consume them at construction time. + * + * <p>Whenever a root {@link AppliedPTransform transform} produces elements, the + * {@link InMemoryWatermarkManager} is provided with the produced elements and the output watermark + * of the producing {@link AppliedPTransform transform}. The + * {@link InMemoryWatermarkManager watermark manager} is responsible for computing the watermarks + * of all {@link AppliedPTransform transforms} that consume one or more + * {@link PCollection PCollections}. + * + * <p>Whenever a non-root {@link AppliedPTransform} finishes processing one or more in-flight + * elements (referred to as the input {@link CommittedBundle bundle}), the following occurs + * atomically: + * <ul> + * <li>All of the in-flight elements are removed from the collection of pending elements for the + * {@link AppliedPTransform}.</li> + * <li>All of the elements produced by the {@link AppliedPTransform} are added to the collection + * of pending elements for each {@link AppliedPTransform} that consumes them.</li> + * <li>The input watermark for the {@link AppliedPTransform} becomes the maximum value of + * <ul> + * <li>the previous input watermark</li> + * <li>the minimum of + * <ul> + * <li>the timestamps of all currently pending elements</li> + * <li>all input {@link PCollection} watermarks</li> + * </ul> + * </li> + * </ul> + * </li> + * <li>The output watermark for the {@link AppliedPTransform} becomes the maximum of + * <ul> + * <li>the previous output watermark</li> + * <li>the minimum of + * <ul> + * <li>the current input watermark</li> + * <li>the current watermark holds</li> + * </ul> + * </li> + * </ul> + * </li> + * <li>The watermark of the output {@link PCollection} can be advanced to the output watermark of + * the {@link AppliedPTransform}</li> + * <li>The watermark of all downstream {@link AppliedPTransform AppliedPTransforms} can be + * advanced.</li> + * </ul> + * + * <p>The watermark of a {@link PCollection} is equal to the output watermark of the + * {@link AppliedPTransform} that produces it. + * + * <p>The watermarks for a {@link PTransform} are updated as follows when output is committed:<pre> + * Watermark_In' = MAX(Watermark_In, MIN(U(TS_Pending), U(Watermark_InputPCollection))) + * Watermark_Out' = MAX(Watermark_Out, MIN(Watermark_In', U(StateHold))) + * Watermark_PCollection = Watermark_Out_ProducingPTransform + * </pre> + */ +public class InMemoryWatermarkManager { + /** + * The watermark of some {@link Pipeline} element, usually a {@link PTransform} or a + * {@link PCollection}. + * + * <p>A watermark is a monotonically increasing value, which represents the point up to which the + * system believes it has received all of the data. Data that arrives with a timestamp that is + * before the watermark is considered late. {@link BoundedWindow#TIMESTAMP_MAX_VALUE} is a special + * timestamp which indicates we have received all of the data and there will be no more on-time or + * late data. This value is represented by {@link InMemoryWatermarkManager#THE_END_OF_TIME}. + */ + private static interface Watermark { + /** + * Returns the current value of this watermark. + */ + Instant get(); + + /** + * Refreshes the value of this watermark from its input watermarks and watermark holds. + * + * @return true if the value of the watermark has changed (and thus dependent watermark must + * also be updated + */ + WatermarkUpdate refresh(); + } + + /** + * The result of computing a {@link Watermark}. + */ + private static enum WatermarkUpdate { + /** The watermark is later than the value at the previous time it was computed. */ + ADVANCED(true), + /** The watermark is equal to the value at the previous time it was computed. */ + NO_CHANGE(false); + + private final boolean advanced; + + private WatermarkUpdate(boolean advanced) { + this.advanced = advanced; + } + + public boolean isAdvanced() { + return advanced; + } + + /** + * Returns the {@link WatermarkUpdate} that is a result of combining the two watermark updates. + * + * If either of the input {@link WatermarkUpdate WatermarkUpdates} were advanced, the result + * {@link WatermarkUpdate} has been advanced. + */ + public WatermarkUpdate union(WatermarkUpdate that) { + if (this.advanced) { + return this; + } + return that; + } + + /** + * Returns the {@link WatermarkUpdate} based on the former and current + * {@link Instant timestamps}. + */ + public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTime) { + if (currentTime.isAfter(oldTime)) { + return ADVANCED; + } + return NO_CHANGE; + } + } + + /** + * The input {@link Watermark} of an {@link AppliedPTransform}. + * + * <p>At any point, the value of an {@link AppliedPTransformInputWatermark} is equal to the + * minimum watermark across all of its input {@link Watermark Watermarks}, and the minimum + * timestamp of all of the pending elements, restricted to be monotonically increasing. + * + * <p>See {@link #refresh()} for more information. + */ + private static class AppliedPTransformInputWatermark implements Watermark { + private final Collection<? extends Watermark> inputWatermarks; + private final SortedMultiset<WindowedValue<?>> pendingElements; + private final Map<Object, NavigableSet<TimerData>> objectTimers; + + private AtomicReference<Instant> currentWatermark; + + public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) { + this.inputWatermarks = inputWatermarks; + this.pendingElements = TreeMultiset.create(new WindowedValueByTimestampComparator()); + this.objectTimers = new HashMap<>(); + currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); + } + + @Override + public Instant get() { + return currentWatermark.get(); + } + + /** + * {@inheritDoc}. + * + * <p>When refresh is called, the value of the {@link AppliedPTransformInputWatermark} becomes + * equal to the maximum value of + * <ul> + * <li>the previous input watermark</li> + * <li>the minimum of + * <ul> + * <li>the timestamps of all currently pending elements</li> + * <li>all input {@link PCollection} watermarks</li> + * </ul> + * </li> + * </ul> + */ + @Override + public synchronized WatermarkUpdate refresh() { + Instant oldWatermark = currentWatermark.get(); + Instant minInputWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE; + for (Watermark inputWatermark : inputWatermarks) { + minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, inputWatermark.get()); + } + if (!pendingElements.isEmpty()) { + minInputWatermark = INSTANT_ORDERING.min( + minInputWatermark, pendingElements.firstEntry().getElement().getTimestamp()); + } + Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark); + currentWatermark.set(newWatermark); + return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark); + } + + private synchronized void addPendingElements(Iterable<? extends WindowedValue<?>> newPending) { + for (WindowedValue<?> pendingElement : newPending) { + pendingElements.add(pendingElement); + } + } + + private synchronized void removePendingElements( + Iterable<? extends WindowedValue<?>> finishedElements) { + for (WindowedValue<?> finishedElement : finishedElements) { + pendingElements.remove(finishedElement); + } + } + + private synchronized void updateTimers(TimerUpdate update) { + NavigableSet<TimerData> keyTimers = objectTimers.get(update.key); + if (keyTimers == null) { + keyTimers = new TreeSet<>(); + objectTimers.put(update.key, keyTimers); + } + for (TimerData timer : update.setTimers) { + if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { + keyTimers.add(timer); + } + } + for (TimerData timer : update.deletedTimers) { + if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { + keyTimers.remove(timer); + } + } + // We don't keep references to timers that have been fired and delivered via #getFiredTimers() + } + + private synchronized Map<Object, List<TimerData>> extractFiredEventTimeTimers() { + return extractFiredTimers(currentWatermark.get(), objectTimers); + } + + @Override + public synchronized String toString() { + return MoreObjects.toStringHelper(AppliedPTransformInputWatermark.class) + .add("pendingElements", pendingElements) + .add("currentWatermark", currentWatermark) + .toString(); + } + } + + /** + * The output {@link Watermark} of an {@link AppliedPTransform}. + * + * <p>The value of an {@link AppliedPTransformOutputWatermark} is equal to the minimum of the + * current watermark hold and the {@link AppliedPTransformInputWatermark} for the same + * {@link AppliedPTransform}, restricted to be monotonically increasing. See + * {@link #refresh()} for more information. + */ + private static class AppliedPTransformOutputWatermark implements Watermark { + private final Watermark inputWatermark; + private final PerKeyHolds holds; + private AtomicReference<Instant> currentWatermark; + + public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) { + this.inputWatermark = inputWatermark; + holds = new PerKeyHolds(); + currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); + } + + public synchronized void updateHold(Object key, Instant newHold) { + if (newHold == null) { + holds.removeHold(key); + } else { + holds.updateHold(key, newHold); + } + } + + @Override + public Instant get() { + return currentWatermark.get(); + } + + /** + * {@inheritDoc}. + * + * <p>When refresh is called, the value of the {@link AppliedPTransformOutputWatermark} becomes + * equal to the maximum value of: + * <ul> + * <li>the previous output watermark</li> + * <li>the minimum of + * <ul> + * <li>the current input watermark</li> + * <li>the current watermark holds</li> + * </ul> + * </li> + * </ul> + */ + @Override + public synchronized WatermarkUpdate refresh() { + Instant oldWatermark = currentWatermark.get(); + Instant newWatermark = INSTANT_ORDERING.min(inputWatermark.get(), holds.getMinHold()); + newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark); + currentWatermark.set(newWatermark); + return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark); + } + + @Override + public synchronized String toString() { + return MoreObjects.toStringHelper(AppliedPTransformOutputWatermark.class) + .add("holds", holds) + .add("currentWatermark", currentWatermark) + .toString(); + } + } + + /** + * The input {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an + * {@link AppliedPTransform}. + * + * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeInputWatermark} is equal + * to the minimum across all pending bundles at the {@link AppliedPTransform} and all upstream + * {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the input + * synchronized processing time at any step is equal to the maximum of: + * <ul> + * <li>The most recently returned synchronized processing input time + * <li>The minimum of + * <ul> + * <li>The current processing time + * <li>The current synchronized processing time input hold + * </ul> + * </ul> + */ + private static class SynchronizedProcessingTimeInputWatermark implements Watermark { + private final Collection<? extends Watermark> inputWms; + private final Collection<CommittedBundle<?>> pendingBundles; + private final Map<Object, NavigableSet<TimerData>> processingTimers; + private final Map<Object, NavigableSet<TimerData>> synchronizedProcessingTimers; + + private final PriorityQueue<TimerData> pendingTimers; + + private AtomicReference<Instant> earliestHold; + + public SynchronizedProcessingTimeInputWatermark(Collection<? extends Watermark> inputWms) { + this.inputWms = inputWms; + this.pendingBundles = new HashSet<>(); + this.processingTimers = new HashMap<>(); + this.synchronizedProcessingTimers = new HashMap<>(); + this.pendingTimers = new PriorityQueue<>(); + Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE; + for (Watermark wm : inputWms) { + initialHold = INSTANT_ORDERING.min(initialHold, wm.get()); + } + earliestHold = new AtomicReference<>(initialHold); + } + + @Override + public Instant get() { + return earliestHold.get(); + } + + /** + * {@inheritDoc}. + * + * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeInputWatermark} + * becomes equal to the minimum value of + * <ul> + * <li>the timestamps of all currently pending bundles</li> + * <li>all input {@link PCollection} synchronized processing time watermarks</li> + * </ul> + * + * <p>Note that this value is not monotonic, but the returned value for the synchronized + * processing time must be. + */ + @Override + public synchronized WatermarkUpdate refresh() { + Instant oldHold = earliestHold.get(); + Instant minTime = THE_END_OF_TIME.get(); + for (Watermark input : inputWms) { + minTime = INSTANT_ORDERING.min(minTime, input.get()); + } + for (CommittedBundle<?> bundle : pendingBundles) { + // TODO: Track elements in the bundle by the processing time they were output instead of + // entire bundles. Requried to support arbitrarily splitting and merging bundles between + // steps + minTime = INSTANT_ORDERING.min(minTime, bundle.getSynchronizedProcessingOutputWatermark()); + } + earliestHold.set(minTime); + return WatermarkUpdate.fromTimestamps(oldHold, minTime); + } + + public synchronized void addPending(CommittedBundle<?> bundle) { + pendingBundles.add(bundle); + } + + public synchronized void removePending(CommittedBundle<?> bundle) { + pendingBundles.remove(bundle); + } + + /** + * Return the earliest timestamp of the earliest timer that has not been completed. This is + * either the earliest timestamp across timers that have not been completed, or the earliest + * timestamp across timers that have been delivered but have not been completed. + */ + public synchronized Instant getEarliestTimerTimestamp() { + Instant earliest = THE_END_OF_TIME.get(); + for (NavigableSet<TimerData> timers : processingTimers.values()) { + if (!timers.isEmpty()) { + earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest); + } + } + for (NavigableSet<TimerData> timers : synchronizedProcessingTimers.values()) { + if (!timers.isEmpty()) { + earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest); + } + } + if (!pendingTimers.isEmpty()) { + earliest = INSTANT_ORDERING.min(pendingTimers.peek().getTimestamp(), earliest); + } + return earliest; + } + + private synchronized void updateTimers(TimerUpdate update) { + for (TimerData completedTimer : update.completedTimers) { + pendingTimers.remove(completedTimer); + } + Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key); + for (TimerData addedTimer : update.setTimers) { + NavigableSet<TimerData> timerQueue = timerMap.get(addedTimer.getDomain()); + if (timerQueue != null) { + timerQueue.add(addedTimer); + } + } + for (TimerData deletedTimer : update.deletedTimers) { + NavigableSet<TimerData> timerQueue = timerMap.get(deletedTimer.getDomain()); + if (timerQueue != null) { + timerQueue.remove(deletedTimer); + } + } + } + + private synchronized Map<Object, List<TimerData>> extractFiredDomainTimers( + TimeDomain domain, Instant firingTime) { + Map<Object, List<TimerData>> firedTimers; + switch (domain) { + case PROCESSING_TIME: + firedTimers = extractFiredTimers(firingTime, processingTimers); + break; + case SYNCHRONIZED_PROCESSING_TIME: + firedTimers = + extractFiredTimers( + INSTANT_ORDERING.min(firingTime, earliestHold.get()), + synchronizedProcessingTimers); + break; + default: + throw new IllegalArgumentException( + "Called getFiredTimers on a Synchronized Processing Time watermark" + + " and gave a non-processing time domain " + + domain); + } + for (Map.Entry<Object, ? extends Collection<TimerData>> firedTimer : firedTimers.entrySet()) { + pendingTimers.addAll(firedTimer.getValue()); + } + return firedTimers; + } + + private Map<TimeDomain, NavigableSet<TimerData>> timerMap(Object key) { + NavigableSet<TimerData> processingQueue = processingTimers.get(key); + if (processingQueue == null) { + processingQueue = new TreeSet<>(); + processingTimers.put(key, processingQueue); + } + NavigableSet<TimerData> synchronizedProcessingQueue = + synchronizedProcessingTimers.get(key); + if (synchronizedProcessingQueue == null) { + synchronizedProcessingQueue = new TreeSet<>(); + synchronizedProcessingTimers.put(key, synchronizedProcessingQueue); + } + EnumMap<TimeDomain, NavigableSet<TimerData>> result = new EnumMap<>(TimeDomain.class); + result.put(TimeDomain.PROCESSING_TIME, processingQueue); + result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue); + return result; + } + + @Override + public synchronized String toString() { + return MoreObjects.toStringHelper(SynchronizedProcessingTimeInputWatermark.class) + .add("earliestHold", earliestHold) + .toString(); + } + } + + /** + * The output {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an + * {@link AppliedPTransform}. + * + * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeOutputWatermark} is + * equal to the minimum across all incomplete timers at the {@link AppliedPTransform} and all + * upstream {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the output + * synchronized processing time at any step is equal to the maximum of: + * <ul> + * <li>The most recently returned synchronized processing output time + * <li>The minimum of + * <ul> + * <li>The current processing time + * <li>The current synchronized processing time output hold + * </ul> + * </ul> + */ + private static class SynchronizedProcessingTimeOutputWatermark implements Watermark { + private final SynchronizedProcessingTimeInputWatermark inputWm; + private AtomicReference<Instant> latestRefresh; + + public SynchronizedProcessingTimeOutputWatermark( + SynchronizedProcessingTimeInputWatermark inputWm) { + this.inputWm = inputWm; + this.latestRefresh = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); + } + + @Override + public Instant get() { + return latestRefresh.get(); + } + + /** + * {@inheritDoc}. + * + * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeOutputWatermark} + * becomes equal to the minimum value of: + * <ul> + * <li>the current input watermark. + * <li>all {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} timers that are based on the input + * watermark. + * <li>all {@link TimeDomain#PROCESSING_TIME} timers that are based on the input watermark. + * </ul> + * + * <p>Note that this value is not monotonic, but the returned value for the synchronized + * processing time must be. + */ + @Override + public synchronized WatermarkUpdate refresh() { + // Hold the output synchronized processing time to the input watermark, which takes into + // account buffered bundles, and the earliest pending timer, which determines what to hold + // downstream timers to. + Instant oldRefresh = latestRefresh.get(); + Instant newTimestamp = + INSTANT_ORDERING.min(inputWm.get(), inputWm.getEarliestTimerTimestamp()); + latestRefresh.set(newTimestamp); + return WatermarkUpdate.fromTimestamps(oldRefresh, newTimestamp); + } + + @Override + public synchronized String toString() { + return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class) + .add("latestRefresh", latestRefresh) + .toString(); + } + } + + /** + * The {@code Watermark} that is after the latest time it is possible to represent in the global + * window. This is a distinguished value representing a complete {@link PTransform}. + */ + private static final Watermark THE_END_OF_TIME = new Watermark() { + @Override + public WatermarkUpdate refresh() { + // THE_END_OF_TIME is a distinguished value that cannot be advanced. + return WatermarkUpdate.NO_CHANGE; + } + + @Override + public Instant get() { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + }; + + private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural(); + + /** + * A function that takes a WindowedValue and returns the exploded representation of that + * {@link WindowedValue}. + */ + private static final Function<WindowedValue<?>, ? extends Iterable<? extends WindowedValue<?>>> + EXPLODE_WINDOWS_FN = + new Function<WindowedValue<?>, Iterable<? extends WindowedValue<?>>>() { + @Override + public Iterable<? extends WindowedValue<?>> apply(WindowedValue<?> input) { + return input.explodeWindows(); + } + }; + + /** + * For each (Object, PriorityQueue) pair in the provided map, remove each Timer that is before the + * latestTime argument and put in in the result with the same key, then remove all of the keys + * which have no more pending timers. + * + * The result collection retains ordering of timers (from earliest to latest). + */ + private static Map<Object, List<TimerData>> extractFiredTimers( + Instant latestTime, Map<Object, NavigableSet<TimerData>> objectTimers) { + Map<Object, List<TimerData>> result = new HashMap<>(); + Set<Object> emptyKeys = new HashSet<>(); + for (Map.Entry<Object, NavigableSet<TimerData>> pendingTimers : objectTimers.entrySet()) { + NavigableSet<TimerData> timers = pendingTimers.getValue(); + if (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) { + ArrayList<TimerData> keyFiredTimers = new ArrayList<>(); + result.put(pendingTimers.getKey(), keyFiredTimers); + while (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) { + keyFiredTimers.add(timers.first()); + timers.remove(timers.first()); + } + } + if (timers.isEmpty()) { + emptyKeys.add(pendingTimers.getKey()); + } + } + objectTimers.keySet().removeAll(emptyKeys); + return result; + } + + //////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * The {@link Clock} providing the current time in the {@link TimeDomain#PROCESSING_TIME} domain. + */ + private final Clock clock; + + /** + * A map from each {@link PCollection} to all {@link AppliedPTransform PTransform applications} + * that consume that {@link PCollection}. + */ + private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers; + + /** + * The input and output watermark of each {@link AppliedPTransform}. + */ + private final Map<AppliedPTransform<?, ?, ?>, TransformWatermarks> transformToWatermarks; + + /** + * Creates a new {@link InMemoryWatermarkManager}. All watermarks within the newly created + * {@link InMemoryWatermarkManager} start at {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, the + * minimum watermark, with no watermark holds or pending elements. + * + * @param rootTransforms the root-level transforms of the {@link Pipeline} + * @param consumers a mapping between each {@link PCollection} in the {@link Pipeline} to the + * transforms that consume it as a part of their input + */ + public static InMemoryWatermarkManager create( + Clock clock, + Collection<AppliedPTransform<?, ?, ?>> rootTransforms, + Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) { + return new InMemoryWatermarkManager(clock, rootTransforms, consumers); + } + + private InMemoryWatermarkManager( + Clock clock, + Collection<AppliedPTransform<?, ?, ?>> rootTransforms, + Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) { + this.clock = clock; + this.consumers = consumers; + + transformToWatermarks = new HashMap<>(); + + for (AppliedPTransform<?, ?, ?> rootTransform : rootTransforms) { + getTransformWatermark(rootTransform); + } + for (Collection<AppliedPTransform<?, ?, ?>> intermediateTransforms : consumers.values()) { + for (AppliedPTransform<?, ?, ?> transform : intermediateTransforms) { + getTransformWatermark(transform); + } + } + } + + private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> transform) { + TransformWatermarks wms = transformToWatermarks.get(transform); + if (wms == null) { + List<Watermark> inputCollectionWatermarks = getInputWatermarks(transform); + AppliedPTransformInputWatermark inputWatermark = + new AppliedPTransformInputWatermark(inputCollectionWatermarks); + AppliedPTransformOutputWatermark outputWatermark = + new AppliedPTransformOutputWatermark(inputWatermark); + + SynchronizedProcessingTimeInputWatermark inputProcessingWatermark = + new SynchronizedProcessingTimeInputWatermark(getInputProcessingWatermarks(transform)); + SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark = + new SynchronizedProcessingTimeOutputWatermark(inputProcessingWatermark); + + wms = + new TransformWatermarks( + inputWatermark, outputWatermark, inputProcessingWatermark, outputProcessingWatermark); + transformToWatermarks.put(transform, wms); + } + return wms; + } + + private Collection<Watermark> getInputProcessingWatermarks( + AppliedPTransform<?, ?, ?> transform) { + ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder(); + Collection<? extends PValue> inputs = transform.getInput().expand(); + if (inputs.isEmpty()) { + inputWmsBuilder.add(THE_END_OF_TIME); + } + for (PValue pvalue : inputs) { + Watermark producerOutputWatermark = + getTransformWatermark(pvalue.getProducingTransformInternal()) + .synchronizedProcessingOutputWatermark; + inputWmsBuilder.add(producerOutputWatermark); + } + return inputWmsBuilder.build(); + } + + private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) { + ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder(); + Collection<? extends PValue> inputs = transform.getInput().expand(); + if (inputs.isEmpty()) { + inputWatermarksBuilder.add(THE_END_OF_TIME); + } + for (PValue pvalue : inputs) { + Watermark producerOutputWatermark = + getTransformWatermark(pvalue.getProducingTransformInternal()).outputWatermark; + inputWatermarksBuilder.add(producerOutputWatermark); + } + List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build(); + return inputCollectionWatermarks; + } + + //////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Gets the input and output watermarks for an {@link AppliedPTransform}. If the + * {@link AppliedPTransform PTransform} has not processed any elements, return a watermark of + * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. + * + * @return a snapshot of the input watermark and output watermark for the provided transform + */ + public TransformWatermarks getWatermarks(AppliedPTransform<?, ?, ?> transform) { + return transformToWatermarks.get(transform); + } + + /** + * Updates the watermarks of a transform with one or more inputs. + * + * <p>Each transform has two monotonically increasing watermarks: the input watermark, which can, + * at any time, be updated to equal: + * <pre> + * MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks)) + * </pre> + * and the output watermark, which can, at any time, be updated to equal: + * <pre> + * MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds)) + * </pre>. + * + * @param completed the input that has completed + * @param transform the transform that has completed processing the input + * @param outputs the bundles the transform has output + * @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there + * is no hold + */ + public void updateWatermarks( + @Nullable CommittedBundle<?> completed, + AppliedPTransform<?, ?, ?> transform, + TimerUpdate timerUpdate, + Iterable<? extends CommittedBundle<?>> outputs, + @Nullable Instant earliestHold) { + updatePending(completed, transform, timerUpdate, outputs); + TransformWatermarks transformWms = transformToWatermarks.get(transform); + transformWms.setEventTimeHold(completed == null ? null : completed.getKey(), earliestHold); + refreshWatermarks(transform); + } + + private void refreshWatermarks(AppliedPTransform<?, ?, ?> transform) { + TransformWatermarks myWatermarks = transformToWatermarks.get(transform); + WatermarkUpdate updateResult = myWatermarks.refresh(); + if (updateResult.isAdvanced()) { + for (PValue outputPValue : transform.getOutput().expand()) { + Collection<AppliedPTransform<?, ?, ?>> downstreamTransforms = consumers.get(outputPValue); + if (downstreamTransforms != null) { + for (AppliedPTransform<?, ?, ?> downstreamTransform : downstreamTransforms) { + refreshWatermarks(downstreamTransform); + } + } + } + } + } + + /** + * Removes all of the completed Timers from the collection of pending timers, adds all new timers, + * and removes all deleted timers. Removes all elements consumed by the input bundle from the + * {@link PTransform PTransforms} collection of pending elements, and adds all elements produced + * by the {@link PTransform} to the pending queue of each consumer. + */ + private void updatePending( + CommittedBundle<?> input, + AppliedPTransform<?, ?, ?> transform, + TimerUpdate timerUpdate, + Iterable<? extends CommittedBundle<?>> outputs) { + TransformWatermarks completedTransform = transformToWatermarks.get(transform); + completedTransform.updateTimers(timerUpdate); + if (input != null) { + completedTransform.removePending(input); + } + + for (CommittedBundle<?> bundle : outputs) { + for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) { + TransformWatermarks watermarks = transformToWatermarks.get(consumer); + watermarks.addPending(bundle); + } + } + } + + /** + * Returns a map of each {@link PTransform} that has pending timers to those timers. All of the + * pending timers will be removed from this {@link InMemoryWatermarkManager}. + */ + public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() { + Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> allTimers = new HashMap<>(); + for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry : + transformToWatermarks.entrySet()) { + Map<Object, FiredTimers> keyFiredTimers = watermarksEntry.getValue().extractFiredTimers(); + if (!keyFiredTimers.isEmpty()) { + allTimers.put(watermarksEntry.getKey(), keyFiredTimers); + } + } + return allTimers; + } + + /** + * A (key, Instant) pair that holds the watermark. Holds are per-key, but the watermark is global, + * and as such the watermark manager must track holds and the release of holds on a per-key basis. + * + * <p>The {@link #compareTo(KeyedHold)} method of {@link KeyedHold} is not consistent with equals, + * as the key is arbitrarily ordered via identity, rather than object equality. + */ + private static final class KeyedHold implements Comparable<KeyedHold> { + private static final Ordering<Object> KEY_ORDERING = Ordering.arbitrary().nullsLast(); + + private final Object key; + private final Instant timestamp; + + /** + * Create a new KeyedHold with the specified key and timestamp. + */ + public static KeyedHold of(Object key, Instant timestamp) { + return new KeyedHold(key, MoreObjects.firstNonNull(timestamp, THE_END_OF_TIME.get())); + } + + private KeyedHold(Object key, Instant timestamp) { + this.key = key; + this.timestamp = timestamp; + } + + @Override + public int compareTo(KeyedHold that) { + return ComparisonChain.start() + .compare(this.timestamp, that.timestamp) + .compare(this.key, that.key, KEY_ORDERING) + .result(); + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, key); + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof KeyedHold)) { + return false; + } + KeyedHold that = (KeyedHold) other; + return Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.key, that.key); + } + + /** + * Get the value of this {@link KeyedHold}. + */ + public Instant getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(KeyedHold.class) + .add("key", key) + .add("hold", timestamp) + .toString(); + } + } + + private static class PerKeyHolds { + private final Map<Object, KeyedHold> keyedHolds; + private final PriorityQueue<KeyedHold> allHolds; + + private PerKeyHolds() { + this.keyedHolds = new HashMap<>(); + this.allHolds = new PriorityQueue<>(); + } + + /** + * Gets the minimum hold across all keys in this {@link PerKeyHolds}, or THE_END_OF_TIME if + * there are no holds within this {@link PerKeyHolds}. + */ + public Instant getMinHold() { + return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.peek().getTimestamp(); + } + + /** + * Updates the hold of the provided key to the provided value, removing any other holds for + * the same key. + */ + public void updateHold(@Nullable Object key, Instant newHold) { + removeHold(key); + KeyedHold newKeyedHold = KeyedHold.of(key, newHold); + keyedHolds.put(key, newKeyedHold); + allHolds.offer(newKeyedHold); + } + + /** + * Removes the hold of the provided key. + */ + public void removeHold(Object key) { + KeyedHold oldHold = keyedHolds.get(key); + if (oldHold != null) { + allHolds.remove(oldHold); + } + } + } + + /** + * A reference to the input and output watermarks of an {@link AppliedPTransform}. + */ + public class TransformWatermarks { + private final AppliedPTransformInputWatermark inputWatermark; + private final AppliedPTransformOutputWatermark outputWatermark; + + private final SynchronizedProcessingTimeInputWatermark synchronizedProcessingInputWatermark; + private final SynchronizedProcessingTimeOutputWatermark synchronizedProcessingOutputWatermark; + + private Instant latestSynchronizedInputWm; + private Instant latestSynchronizedOutputWm; + + private TransformWatermarks( + AppliedPTransformInputWatermark inputWatermark, + AppliedPTransformOutputWatermark outputWatermark, + SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark, + SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) { + this.inputWatermark = inputWatermark; + this.outputWatermark = outputWatermark; + + this.synchronizedProcessingInputWatermark = inputSynchProcessingWatermark; + this.synchronizedProcessingOutputWatermark = outputSynchProcessingWatermark; + this.latestSynchronizedInputWm = BoundedWindow.TIMESTAMP_MIN_VALUE; + this.latestSynchronizedOutputWm = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + /** + * Returns the input watermark of the {@link AppliedPTransform}. + */ + public Instant getInputWatermark() { + return Preconditions.checkNotNull(inputWatermark.get()); + } + + /** + * Returns the output watermark of the {@link AppliedPTransform}. + */ + public Instant getOutputWatermark() { + return outputWatermark.get(); + } + + /** + * Returns the synchronized processing input time of the {@link AppliedPTransform}. + * + * <p>The returned value is guaranteed to be monotonically increasing, and outside of the + * presence of holds, will increase as the system time progresses. + */ + public synchronized Instant getSynchronizedProcessingInputTime() { + latestSynchronizedInputWm = INSTANT_ORDERING.max( + latestSynchronizedInputWm, + INSTANT_ORDERING.min(clock.now(), synchronizedProcessingInputWatermark.get())); + return latestSynchronizedInputWm; + } + + /** + * Returns the synchronized processing output time of the {@link AppliedPTransform}. + * + * <p>The returned value is guaranteed to be monotonically increasing, and outside of the + * presence of holds, will increase as the system time progresses. + */ + public synchronized Instant getSynchronizedProcessingOutputTime() { + latestSynchronizedOutputWm = INSTANT_ORDERING.max( + latestSynchronizedOutputWm, + INSTANT_ORDERING.min(clock.now(), synchronizedProcessingOutputWatermark.get())); + return latestSynchronizedOutputWm; + } + + private WatermarkUpdate refresh() { + inputWatermark.refresh(); + synchronizedProcessingInputWatermark.refresh(); + WatermarkUpdate eventOutputUpdate = outputWatermark.refresh(); + WatermarkUpdate syncOutputUpdate = synchronizedProcessingOutputWatermark.refresh(); + return eventOutputUpdate.union(syncOutputUpdate); + } + + private void setEventTimeHold(Object key, Instant newHold) { + outputWatermark.updateHold(key, newHold); + } + + private void removePending(CommittedBundle<?> bundle) { + inputWatermark.removePendingElements(elementsFromBundle(bundle)); + synchronizedProcessingInputWatermark.removePending(bundle); + } + + private void addPending(CommittedBundle<?> bundle) { + inputWatermark.addPendingElements(elementsFromBundle(bundle)); + synchronizedProcessingInputWatermark.addPending(bundle); + } + + private Iterable<? extends WindowedValue<?>> elementsFromBundle(CommittedBundle<?> bundle) { + return FluentIterable.from(bundle.getElements()).transformAndConcat(EXPLODE_WINDOWS_FN); + } + + private Map<Object, FiredTimers> extractFiredTimers() { + Map<Object, List<TimerData>> eventTimeTimers = inputWatermark.extractFiredEventTimeTimers(); + Map<Object, List<TimerData>> processingTimers; + Map<Object, List<TimerData>> synchronizedTimers; + if (inputWatermark.get().equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers( + TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE); + synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers( + TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE); + } else { + processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers( + TimeDomain.PROCESSING_TIME, clock.now()); + synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers( + TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime()); + } + Map<Object, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>(); + groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers); + + Map<Object, FiredTimers> keyFiredTimers = new HashMap<>(); + for (Map.Entry<Object, Map<TimeDomain, List<TimerData>>> firedTimers : + groupedTimers.entrySet()) { + keyFiredTimers.put(firedTimers.getKey(), new FiredTimers(firedTimers.getValue())); + } + return keyFiredTimers; + } + + @SafeVarargs + private final void groupFiredTimers( + Map<Object, Map<TimeDomain, List<TimerData>>> groupedToMutate, + Map<Object, List<TimerData>>... timersToGroup) { + for (Map<Object, List<TimerData>> subGroup : timersToGroup) { + for (Map.Entry<Object, List<TimerData>> newTimers : subGroup.entrySet()) { + Map<TimeDomain, List<TimerData>> grouped = groupedToMutate.get(newTimers.getKey()); + if (grouped == null) { + grouped = new HashMap<>(); + groupedToMutate.put(newTimers.getKey(), grouped); + } + grouped.put(newTimers.getValue().get(0).getDomain(), newTimers.getValue()); + } + } + } + + private void updateTimers(TimerUpdate update) { + inputWatermark.updateTimers(update); + synchronizedProcessingInputWatermark.updateTimers(update); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(TransformWatermarks.class) + .add("inputWatermark", inputWatermark) + .add("outputWatermark", outputWatermark) + .add("inputProcessingTime", synchronizedProcessingInputWatermark) + .add("outputProcessingTime", synchronizedProcessingOutputWatermark) + .toString(); + } + } + + /** + * A collection of newly set, deleted, and completed timers. + * + * <p>setTimers and deletedTimers are collections of {@link TimerData} that have been added to the + * {@link TimerInternals} of an executed step. completedTimers are timers that were delivered as + * the input to the executed step. + */ + public static class TimerUpdate { + private final Object key; + private final Iterable<? extends TimerData> completedTimers; + + private final Iterable<? extends TimerData> setTimers; + private final Iterable<? extends TimerData> deletedTimers; + + /** + * Returns a TimerUpdate for a null key with no timers. + */ + public static TimerUpdate empty() { + return new TimerUpdate( + null, + Collections.<TimerData>emptyList(), + Collections.<TimerData>emptyList(), + Collections.<TimerData>emptyList()); + } + + /** + * Creates a new {@link TimerUpdate} builder with the provided completed timers that needs the + * set and deleted timers to be added to it. + */ + public static TimerUpdateBuilder builder(Object key) { + return new TimerUpdateBuilder(key); + } + + /** + * A {@link TimerUpdate} builder that needs to be provided with set timers and deleted timers. + */ + public static final class TimerUpdateBuilder { + private final Object key; + private final Collection<TimerData> completedTimers; + private final Collection<TimerData> setTimers; + private final Collection<TimerData> deletedTimers; + + private TimerUpdateBuilder(Object key) { + this.key = key; + this.completedTimers = new HashSet<>(); + this.setTimers = new HashSet<>(); + this.deletedTimers = new HashSet<>(); + } + + /** + * Adds all of the provided timers to the collection of completed timers, and returns this + * {@link TimerUpdateBuilder}. + */ + public TimerUpdateBuilder withCompletedTimers(Iterable<TimerData> completedTimers) { + Iterables.addAll(this.completedTimers, completedTimers); + return this; + } + + /** + * Adds the provided timer to the collection of set timers, removing it from deleted timers if + * it has previously been deleted. Returns this {@link TimerUpdateBuilder}. + */ + public TimerUpdateBuilder setTimer(TimerData setTimer) { + deletedTimers.remove(setTimer); + setTimers.add(setTimer); + return this; + } + + /** + * Adds the provided timer to the collection of deleted timers, removing it from set timers if + * it has previously been set. Returns this {@link TimerUpdateBuilder}. + */ + public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) { + deletedTimers.add(deletedTimer); + setTimers.remove(deletedTimer); + return this; + } + + /** + * Returns a new {@link TimerUpdate} with the most recently set completedTimers, setTimers, + * and deletedTimers. + */ + public TimerUpdate build() { + return new TimerUpdate( + key, + ImmutableSet.copyOf(completedTimers), + ImmutableSet.copyOf(setTimers), + ImmutableSet.copyOf(deletedTimers)); + } + } + + private TimerUpdate( + Object key, + Iterable<? extends TimerData> completedTimers, + Iterable<? extends TimerData> setTimers, + Iterable<? extends TimerData> deletedTimers) { + this.key = key; + this.completedTimers = completedTimers; + this.setTimers = setTimers; + this.deletedTimers = deletedTimers; + } + + @VisibleForTesting + Object getKey() { + return key; + } + + @VisibleForTesting + Iterable<? extends TimerData> getCompletedTimers() { + return completedTimers; + } + + @VisibleForTesting + Iterable<? extends TimerData> getSetTimers() { + return setTimers; + } + + @VisibleForTesting + Iterable<? extends TimerData> getDeletedTimers() { + return deletedTimers; + } + + /** + * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers. + */ + public TimerUpdate withCompletedTimers(Iterable<TimerData> completedTimers) { + return new TimerUpdate(this.key, completedTimers, setTimers, deletedTimers); + } + + @Override + public int hashCode() { + return Objects.hash(key, completedTimers, setTimers, deletedTimers); + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof TimerUpdate)) { + return false; + } + TimerUpdate that = (TimerUpdate) other; + return Objects.equals(this.key, that.key) + && Objects.equals(this.completedTimers, that.completedTimers) + && Objects.equals(this.setTimers, that.setTimers) + && Objects.equals(this.deletedTimers, that.deletedTimers); + } + } + + /** + * A pair of {@link TimerData} and key which can be delivered to the appropriate + * {@link AppliedPTransform}. A timer fires at the transform that set it with a specific key when + * the time domain in which it lives progresses past a specified time, as determined by the + * {@link InMemoryWatermarkManager}. + */ + public static class FiredTimers { + private final Map<TimeDomain, ? extends Collection<TimerData>> timers; + + private FiredTimers(Map<TimeDomain, ? extends Collection<TimerData>> timers) { + this.timers = timers; + } + + /** + * Gets all of the timers that have fired within the provided {@link TimeDomain}. If no timers + * fired within the provided domain, return an empty collection. + * + * <p>Timers within a {@link TimeDomain} are guaranteed to be in order of increasing timestamp. + */ + public Collection<TimerData> getTimers(TimeDomain domain) { + Collection<TimerData> domainTimers = timers.get(domain); + if (domainTimers == null) { + return Collections.emptyList(); + } + return domainTimers; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(FiredTimers.class).add("timers", timers).toString(); + } + } + + private static class WindowedValueByTimestampComparator extends Ordering<WindowedValue<?>> { + @Override + public int compare(WindowedValue<?> o1, WindowedValue<?> o2) { + return ComparisonChain.start() + .compare(o1.getTimestamp(), o2.getTimestamp()) + .result(); + } + } + + public Set<AppliedPTransform<?, ?, ?>> getCompletedTransforms() { + Set<AppliedPTransform<?, ?, ?>> result = new HashSet<>(); + for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> wms : + transformToWatermarks.entrySet()) { + if (wms.getValue().getOutputWatermark().equals(THE_END_OF_TIME.get())) { + result.add(wms.getKey()); + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java new file mode 100644 index 0000000..bc9b04c --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; + +import javax.annotation.Nullable; + +/** + * A factory that produces bundles that perform no additional validation. + */ +class InProcessBundleFactory implements BundleFactory { + public static InProcessBundleFactory create() { + return new InProcessBundleFactory(); + } + + private InProcessBundleFactory() {} + + @Override + public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) { + return InProcessBundle.create(output, null); + } + + @Override + public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) { + return InProcessBundle.create(output, input.getKey()); + } + + @Override + public <T> UncommittedBundle<T> createKeyedBundle( + CommittedBundle<?> input, @Nullable Object key, PCollection<T> output) { + return InProcessBundle.create(output, key); + } + + /** + * A {@link UncommittedBundle} that buffers elements in memory. + */ + private static final class InProcessBundle<T> implements UncommittedBundle<T> { + private final PCollection<T> pcollection; + @Nullable private final Object key; + private boolean committed = false; + private ImmutableList.Builder<WindowedValue<T>> elements; + + /** + * Create a new {@link InProcessBundle} for the specified {@link PCollection}. + */ + public static <T> InProcessBundle<T> create(PCollection<T> pcollection, @Nullable Object key) { + return new InProcessBundle<T>(pcollection, key); + } + + private InProcessBundle(PCollection<T> pcollection, Object key) { + this.pcollection = pcollection; + this.key = key; + this.elements = ImmutableList.builder(); + } + + @Override + public PCollection<T> getPCollection() { + return pcollection; + } + + @Override + public InProcessBundle<T> add(WindowedValue<T> element) { + checkState( + !committed, + "Can't add element %s to committed bundle in PCollection %s", + element, + pcollection); + elements.add(element); + return this; + } + + @Override + public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) { + checkState(!committed, "Can't commit already committed bundle %s", this); + committed = true; + final Iterable<WindowedValue<T>> committedElements = elements.build(); + return new CommittedInProcessBundle<>( + pcollection, key, committedElements, synchronizedCompletionTime); + } + } + + private static class CommittedInProcessBundle<T> implements CommittedBundle<T> { + public CommittedInProcessBundle( + PCollection<T> pcollection, + Object key, + Iterable<WindowedValue<T>> committedElements, + Instant synchronizedCompletionTime) { + this.pcollection = pcollection; + this.key = key; + this.committedElements = committedElements; + this.synchronizedCompletionTime = synchronizedCompletionTime; + } + + private final PCollection<T> pcollection; + private final Object key; + private final Iterable<WindowedValue<T>> committedElements; + private final Instant synchronizedCompletionTime; + + @Override + @Nullable + public Object getKey() { + return key; + } + + @Override + public Iterable<WindowedValue<T>> getElements() { + return committedElements; + } + + @Override + public PCollection<T> getPCollection() { + return pcollection; + } + + @Override + public Instant getSynchronizedProcessingOutputWatermark() { + return synchronizedCompletionTime; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("pcollection", pcollection) + .add("key", key) + .add("elements", committedElements) + .toString(); + } + + @Override + public CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements) { + return new CommittedInProcessBundle<>( + pcollection, key, ImmutableList.copyOf(elements), synchronizedCompletionTime); + } + } +}