BEAM-261 Apex runner PoC
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aaf38ddf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aaf38ddf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aaf38ddf Branch: refs/heads/apex-runner Commit: aaf38ddfe53bbb67fad4456ee1068d18b9b891b5 Parents: 49f9444 Author: Thomas Weise <t...@apache.org> Authored: Mon Jun 27 11:24:13 2016 -0700 Committer: Thomas Weise <t...@apache.org> Committed: Sun Oct 16 23:21:55 2016 -0700 ---------------------------------------------------------------------- runners/apex/pom.xml | 226 ++++++++++ .../beam/runners/apex/ApexPipelineOptions.java | 60 +++ .../runners/apex/ApexPipelineTranslator.java | 134 ++++++ .../apache/beam/runners/apex/ApexRunner.java | 171 ++++++++ .../beam/runners/apex/ApexRunnerResult.java | 85 ++++ .../beam/runners/apex/TestApexRunner.java | 56 +++ .../translators/CreateValuesTranslator.java | 49 +++ .../FlattenPCollectionTranslator.java | 52 +++ .../apex/translators/GroupByKeyTranslator.java | 41 ++ .../apex/translators/ParDoBoundTranslator.java | 43 ++ .../translators/ReadUnboundedTranslator.java | 42 ++ .../apex/translators/TransformTranslator.java | 31 ++ .../apex/translators/TranslationContext.java | 143 +++++++ .../functions/ApexGroupByKeyOperator.java | 427 +++++++++++++++++++ .../functions/ApexParDoOperator.java | 177 ++++++++ .../io/ApexReadUnboundedInputOperator.java | 125 ++++++ .../apex/translators/io/ValuesSource.java | 152 +++++++ .../apex/translators/utils/ApexStreamTuple.java | 191 +++++++++ .../utils/CoderAdapterStreamCodec.java | 73 ++++ .../translators/utils/NoOpSideInputReader.java | 47 ++ .../apex/translators/utils/NoOpStepContext.java | 73 ++++ .../utils/SerializablePipelineOptions.java | 61 +++ .../apex/examples/StreamingWordCountTest.java | 120 ++++++ .../apex/examples/UnboundedTextSource.java | 144 +++++++ .../FlattenPCollectionTranslatorTest.java | 97 +++++ .../translators/GroupByKeyTranslatorTest.java | 248 +++++++++++ .../translators/ParDoBoundTranslatorTest.java | 164 +++++++ .../translators/ReadUnboundTranslatorTest.java | 130 ++++++ .../translators/utils/CollectionSource.java | 137 ++++++ .../translators/utils/PipelineOptionsTest.java | 82 ++++ .../apex/src/test/resources/log4j.properties | 33 ++ runners/pom.xml | 1 + 32 files changed, 3615 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml new file mode 100644 index 0000000..bb08b3c --- /dev/null +++ b/runners/apex/pom.xml @@ -0,0 +1,226 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-parent</artifactId> + <version>0.3.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-runners-apex_3.4.0</artifactId> + + <name>Apache Beam :: Runners :: Apex</name> + + <packaging>jar</packaging> + + <properties> + <apex.core.version>3.4.0</apex.core.version> + <apex.malhar.version>3.4.0</apex.malhar.version> + <skipIntegrationTests>true</skipIntegrationTests> + <!-- memory limit for embedded cluster --> + <surefire.args>-Xmx2048m</surefire.args> + </properties> + + <dependencies> + <!-- Apex dependencies --> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-common</artifactId> + <version>${apex.core.version}</version> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-library</artifactId> + <version>${apex.malhar.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-engine</artifactId> + <version>${apex.core.version}</version> + <scope>test</scope> + </dependency> + + <!--- Beam --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-java</artifactId> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <!-- javax.annotation.Nullable --> + <groupId>com.google.code.findbugs</groupId> + <artifactId>annotations</artifactId> + </dependency> + + <!-- Test scoped --> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <!-- Depend on test jar to scan for RunnableOnService tests --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!--dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-examples-java</artifactId> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + <scope>test</scope> + </dependency--> + <!-- Optional Pipeline Registration --> + <!--dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency--> + </dependencies> + + <build> + <plugins> + + <!-- Checkstyle errors for now + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + </plugin> + --> + + <!-- Integration Tests --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + </plugin> + + <!-- Unit Tests --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <argLine>${surefire.args}</argLine> + </configuration> + <executions> + <execution> + <id>runnable-on-service-tests</id> + <phase>integration-test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <groups>org.apache.beam.sdk.testing.RunnableOnService</groups> + <parallel>none</parallel> + <failIfNoTests>true</failIfNoTests> + <dependenciesToScan> + <dependency>org.apache.beam:beam-sdks-java-core</dependency> + </dependenciesToScan> + <systemPropertyVariables> + <beamTestPipelineOptions> + [ + "--runner=org.apache.beam.runners.apex.TestApexRunner", + "--streaming=true" + ] + </beamTestPipelineOptions> + </systemPropertyVariables> + <skipTests>${skipIntegrationTests}</skipTests> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>analyze-only</goal> + </goals> + <configuration> + <ignoredUsedUndeclaredDependencies> + <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.4.0</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::2.24.0</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.2.1</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.2.0</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:19.0</ignoredUsedUndeclaredDependency> + </ignoredUsedUndeclaredDependencies> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java new file mode 100644 index 0000000..f70d24c --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Options that configure the Apex pipeline. + */ +public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializable { + + @Description("set unique application name for Apex runner") + void setApplicationName(String name); + + String getApplicationName(); + + @Description("set parallelism for Apex runner") + void setParallelism(int parallelism); + + @Default.Integer(1) + int getParallelism(); + + @Description("execute the pipeline with embedded cluster") + void setEmbeddedExecution(boolean embedded); + + @Default.Boolean(true) + boolean isEmbeddedExecution(); + + @Description("configure embedded execution with debug friendly options") + void setEmbeddedExecutionDebugMode(boolean embeddedDebug); + + @Default.Boolean(true) + boolean isEmbeddedExecutionDebugMode(); + + @Description("how long the client should wait for the pipeline to run") + void setRunMillis(long runMillis); + + @Default.Long(0) + long getRunMillis(); + +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java new file mode 100644 index 0000000..8ea7139 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex; + +import org.apache.beam.runners.apex.translators.CreateValuesTranslator; +import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator; +import org.apache.beam.runners.apex.translators.GroupByKeyTranslator; +import org.apache.beam.runners.apex.translators.ParDoBoundTranslator; +import org.apache.beam.runners.apex.translators.ReadUnboundedTranslator; +import org.apache.beam.runners.apex.translators.TransformTranslator; +import org.apache.beam.runners.apex.translators.TranslationContext; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PValue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * {@link ApexPipelineTranslator} translates {@link Pipeline} objects + * into Apex logical plan {@link DAG}. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { + + private static final Logger LOG = LoggerFactory.getLogger( + ApexPipelineTranslator.class); + + /** + * A map from {@link PTransform} subclass to the corresponding + * {@link TransformTranslator} to use to translate that transform. + */ + private static final Map<Class<? extends PTransform>, TransformTranslator> + transformTranslators = new HashMap<>(); + + private final TranslationContext translationContext; + + static { + // register TransformTranslators + registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator()); + registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator()); + registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator()); + registerTransformTranslator(Flatten.FlattenPCollectionList.class, + new FlattenPCollectionTranslator()); + registerTransformTranslator(Create.Values.class, new CreateValuesTranslator()); + } + + public ApexPipelineTranslator(TranslationContext translationContext) { + this.translationContext = translationContext; + } + + public void translate(Pipeline pipeline) { + pipeline.traverseTopologically(this); + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { + LOG.debug("entering composite transform {}", node.getTransform()); + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + LOG.debug("leaving composite transform {}", node.getTransform()); + } + + @Override + public void visitPrimitiveTransform(TransformTreeNode node) { + LOG.debug("visiting transform {}", node.getTransform()); + PTransform transform = node.getTransform(); + TransformTranslator translator = getTransformTranslator(transform.getClass()); + if (null == translator) { + throw new IllegalStateException( + "no translator registered for " + transform); + } + translationContext.setCurrentTransform(node); + translator.translate(transform, translationContext); + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + LOG.debug("visiting value {}", value); + } + + /** + * Records that instances of the specified PTransform class + * should be translated by default by the corresponding + * {@link TransformTranslator}. + */ + private static <TransformT extends PTransform> void registerTransformTranslator( + Class<TransformT> transformClass, + TransformTranslator<? extends TransformT> transformTranslator) { + if (transformTranslators.put(transformClass, transformTranslator) != null) { + throw new IllegalArgumentException( + "defining multiple translators for " + transformClass); + } + } + + /** + * Returns the {@link TransformTranslator} to use for instances of the + * specified PTransform class, or null if none registered. + */ + private <TransformT extends PTransform<?,?>> + TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) { + return transformTranslators.get(transformClass); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java new file mode 100644 index 0000000..87c8f97 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.runners.apex.translators.TranslationContext; +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.AssignWindows; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.google.common.base.Throwables; + +/** + * A {@link PipelineRunner} that translates the + * pipeline to an Apex DAG and executes it on an Apex cluster. + * <p> + * Currently execution is always in embedded mode, + * launch on Hadoop cluster will be added in subsequent iteration. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ApexRunner extends PipelineRunner<ApexRunnerResult> { + + private final ApexPipelineOptions options; + + public ApexRunner(ApexPipelineOptions options) { + this.options = options; + } + + public static ApexRunner fromOptions(PipelineOptions options) { + return new ApexRunner((ApexPipelineOptions) options); + } + + @Override + public <OutputT extends POutput, InputT extends PInput> OutputT apply( + PTransform<InputT, OutputT> transform, InputT input) { + if (Window.Bound.class.equals(transform.getClass())) { + return (OutputT) ((PCollection) input).apply( + new AssignWindowsAndSetStrategy((Window.Bound) transform)); + } else if (Create.Values.class.equals(transform.getClass())) { + return (OutputT) PCollection + .<OutputT>createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + PCollection.IsBounded.BOUNDED); + } else if (Read.Bounded.class.equals(transform.getClass())) { + return (OutputT) ((PBegin) input).apply(new UnboundedReadFromBoundedSource<>(((Read.Bounded)transform).getSource())); + } else { + return super.apply(transform, input); + } + } + + @Override + public ApexRunnerResult run(Pipeline pipeline) { + + final TranslationContext translationContext = new TranslationContext(options); + ApexPipelineTranslator translator = new ApexPipelineTranslator(translationContext); + translator.translate(pipeline); + + StreamingApplication apexApp = new StreamingApplication() + { + @Override + public void populateDAG(DAG dag, Configuration conf) + { + dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName()); + translationContext.populateDAG(dag); + } + }; + + checkArgument(options.isEmbeddedExecution(), "only embedded execution is supported at this time"); + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + try { + lma.prepareDAG(apexApp, conf); + LocalMode.Controller lc = lma.getController(); + if (options.isEmbeddedExecutionDebugMode()) { + // turns off timeout checking for operator progress + lc.setHeartbeatMonitoringEnabled(false); + } + if (options.getRunMillis() > 0) { + lc.run(options.getRunMillis()); + } else { + lc.runAsync(); + } + return new ApexRunnerResult(lma.getDAG(), lc); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + /** + * copied from DirectPipelineRunner. + * used to replace Window.Bound till equivalent function is added in Apex + */ + private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow> + extends PTransform<PCollection<T>, PCollection<T>> { + + private final Window.Bound<T> wrapped; + + public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) { + this.wrapped = wrapped; + } + + @Override + public PCollection<T> apply(PCollection<T> input) { + WindowingStrategy<?, ?> outputStrategy = + wrapped.getOutputStrategyInternal(input.getWindowingStrategy()); + + WindowFn<T, BoundedWindow> windowFn = + (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn(); + + // If the Window.Bound transform only changed parts other than the WindowFn, then + // we skip AssignWindows even though it should be harmless in a perfect world. + // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly + // crash if another GBK is performed without explicitly setting the WindowFn. So we skip + // AssignWindows in this case. + if (wrapped.getWindowFn() == null) { + return input.apply("Identity", ParDo.of(new IdentityFn<T>())) + .setWindowingStrategyInternal(outputStrategy); + } else { + return input + .apply("AssignWindows", new AssignWindows<>(windowFn)) + .setWindowingStrategyInternal(outputStrategy); + } + } + } + + private static class IdentityFn<T> extends DoFn<T, T> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java new file mode 100644 index 0000000..f28c8dc --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; + +import java.io.IOException; + +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; +import org.apache.beam.sdk.transforms.Aggregator; +import org.joda.time.Duration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; + +/** + * Result of executing a {@link Pipeline} with Apex in embedded mode. + */ +public class ApexRunnerResult implements PipelineResult { + private final DAG apexDAG; + private final LocalMode.Controller ctrl; + private State state = State.UNKNOWN; + + public ApexRunnerResult(DAG dag, LocalMode.Controller ctrl) { + this.apexDAG = dag; + this.ctrl = ctrl; + } + + @Override + public State getState() { + return state; + } + + @Override + public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) + throws AggregatorRetrievalException { + return null; + } + + @Override + public State cancel() throws IOException + { + ctrl.shutdown(); + state = State.CANCELLED; + return state; + } + + @Override + public State waitUntilFinish(Duration duration) throws IOException, InterruptedException + { + throw new UnsupportedOperationException(); + } + + @Override + public State waitUntilFinish() throws IOException, InterruptedException + { + throw new UnsupportedOperationException(); + } + + /** + * Return the DAG executed by the pipeline. + * @return + */ + public DAG getApexDAG() { + return apexDAG; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java new file mode 100644 index 0000000..45c143e --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + + +public class TestApexRunner extends PipelineRunner<ApexRunnerResult> { + + private ApexRunner delegate; + + private TestApexRunner(ApexPipelineOptions options) { + options.setEmbeddedExecution(true); + //options.setEmbeddedExecutionDebugMode(false); + options.setRunMillis(20000); + this.delegate = ApexRunner.fromOptions(options); + } + + public static TestApexRunner fromOptions(PipelineOptions options) { + ApexPipelineOptions apexOptions = PipelineOptionsValidator.validate(ApexPipelineOptions.class, options); + return new TestApexRunner(apexOptions); + } + + @Override + public <OutputT extends POutput, InputT extends PInput> + OutputT apply(PTransform<InputT,OutputT> transform, InputT input) { + return delegate.apply(transform, input); + } + + @Override + public ApexRunnerResult run(Pipeline pipeline) { + return delegate.run(pipeline); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java new file mode 100644 index 0000000..387b19f --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators; + +import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; +import org.apache.beam.runners.apex.translators.io.ValuesSource; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.Create; + +import com.google.common.base.Throwables; + + +/** + * Wraps elements from Create.Values into an {@link UnboundedSource}. + * mainly used for test + */ +public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> { + private static final long serialVersionUID = 1451000241832745629L; + + @Override + public void translate(Create.Values<T> transform, TranslationContext context) { + try { + UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(), + transform.getDefaultOutputCoder(context.getInput())); + ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(unboundedSource, + context.getPipelineOptions()); + context.addOperator(operator, operator.output); + } catch (CannotProvideCoderException e) { + Throwables.propagate(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java new file mode 100644 index 0000000..f228149 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators; + +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; + +import com.datatorrent.lib.stream.StreamMerger; + +/** + * Flatten.FlattenPCollectionList translation to Apex operator. + * TODO: support more than two streams + */ +public class FlattenPCollectionTranslator<T> implements + TransformTranslator<Flatten.FlattenPCollectionList<T>> { + private static final long serialVersionUID = 1L; + + @Override + public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) { + StreamMerger<T> operator = null; + PCollectionList<T> collections = context.getInput(); + if (collections.size() > 2) { + throw new UnsupportedOperationException("Currently supports only 2 collections: " + transform); + } + for (PCollection<T> collection : collections.getAll()) { + if (null == operator) { + operator = new StreamMerger<T>(); + context.addStream(collection, operator.data1); + } else { + context.addStream(collection, operator.data2); + } + } + context.addOperator(operator, operator.out); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java new file mode 100644 index 0000000..43c82a9 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators; + +import org.apache.beam.runners.apex.translators.functions.ApexGroupByKeyOperator; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@link GroupByKey} translation to Apex operator. + */ +public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> { + private static final long serialVersionUID = 1L; + + @Override + public void translate(GroupByKey<K, V> transform, TranslationContext context) { + + PCollection<KV<K, V>> input = context.getInput(); + ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(), input); + context.addOperator(group, group.output); + context.addStream(input, group.input); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java new file mode 100644 index 0000000..a958234 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators; + +import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator; +import org.apache.beam.runners.apex.translators.utils.NoOpSideInputReader; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@link ParDo.Bound} is translated to Apex operator that wraps the {@link DoFn} + */ +public class ParDoBoundTranslator<InputT, OutputT> implements + TransformTranslator<ParDo.Bound<InputT, OutputT>> { + private static final long serialVersionUID = 1L; + + @Override + public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { + OldDoFn<InputT, OutputT> doFn = transform.getFn(); + PCollection<OutputT> output = context.getOutput(); + ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(), + doFn, output.getWindowingStrategy(), new NoOpSideInputReader()); + context.addOperator(operator, operator.output); + context.addStream(context.getInput(), operator.input); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java new file mode 100644 index 0000000..b53e4dd --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators; + +import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; + +import com.datatorrent.api.InputOperator; + +/** + * {@link Read.Unbounded} is translated to Apex {@link InputOperator} + * that wraps {@link UnboundedSource}. + */ +public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> { + private static final long serialVersionUID = 1L; + + @Override + public void translate(Read.Unbounded<T> transform, TranslationContext context) { + UnboundedSource<T, ?> unboundedSource = transform.getSource(); + ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( + unboundedSource, context.getPipelineOptions()); + context.addOperator(operator, operator.output); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java new file mode 100644 index 0000000..1a99885 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators; + + +import org.apache.beam.sdk.transforms.PTransform; + +import java.io.Serializable; + +/** + * translates {@link PTransform} to Apex functions. + */ +public interface TransformTranslator<T extends PTransform<?,?>> extends Serializable { + void translate(T transform, TranslationContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java new file mode 100644 index 0000000..92afd58 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translators; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; +import org.apache.beam.runners.apex.translators.utils.CoderAdapterStreamCodec; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.Operator.OutputPort; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Maintains context data for {@link TransformTranslator}s. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class TranslationContext { + + private final ApexPipelineOptions pipelineOptions; + private AppliedPTransform<?, ?, ?> currentTransform; + private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>(); + private final Map<String, Operator> operators = new HashMap<>(); + + public TranslationContext(ApexPipelineOptions pipelineOptions) { + this.pipelineOptions = pipelineOptions; + } + + public void setCurrentTransform(TransformTreeNode treeNode) { + this.currentTransform = AppliedPTransform.of(treeNode.getFullName(), + treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform()); + } + + public ApexPipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + public <InputT extends PInput> InputT getInput() { + return (InputT) getCurrentTransform().getInput(); + } + + public <OutputT extends POutput> OutputT getOutput() { + return (OutputT) getCurrentTransform().getOutput(); + } + + private AppliedPTransform<?, ?, ?> getCurrentTransform() { + checkArgument(currentTransform != null, "current transform not set"); + return currentTransform; + } + + public void addOperator(Operator operator, OutputPort port) { + // Apex DAG requires a unique operator name + // use the transform's name and make it unique + String name = getCurrentTransform().getFullName(); + for (int i=1; this.operators.containsKey(name); name = getCurrentTransform().getFullName() + i++); + this.operators.put(name, operator); + PCollection<?> output = getOutput(); + this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>())); + } + + /** + * Add operator that is internal to a transformation. + * @param output + * @param operator + * @param port + * @param name + */ + public <T> PInput addInternalOperator(Operator operator, OutputPort port, String name, Coder<T> coder) { + checkArgument(this.operators.get(name) == null, "duplicate operator " + name); + this.operators.put(name, operator); + PCollection<T> input = getInput(); + PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + output.setCoder(coder); + this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>())); + return output; + } + + public void addStream(PInput input, InputPort inputPort) { + Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input); + checkArgument(stream != null, "no upstream operator defined"); + stream.getRight().add(inputPort); + } + + public void populateDAG(DAG dag) { + for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) { + dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue()); + } + int streamIndex = 0; + for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this.streams.entrySet()) { + List<InputPort<?>> sinksList = streamEntry.getValue().getRight(); + InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]); + if (sinks.length > 0) { + dag.addStream("stream"+streamIndex++, streamEntry.getValue().getLeft(), sinks); + for (InputPort port : sinks) { + PCollection pc = streamEntry.getKey(); + Coder coder = pc.getCoder(); + if (pc.getWindowingStrategy() != null) { + coder = FullWindowedValueCoder.of(pc.getCoder(), + pc.getWindowingStrategy().getWindowFn().windowCoder() + ); + } + Coder<Object> wrapperCoder = ApexStreamTuple.ApexStreamTupleCoder.of(coder); + CoderAdapterStreamCodec streamCodec = new CoderAdapterStreamCodec(wrapperCoder); + dag.setInputPortAttribute(port, PortContext.STREAM_CODEC, streamCodec); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java new file mode 100644 index 0000000..4608c92 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translators.functions; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; +import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions; +import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItems; +import org.apache.beam.sdk.util.SystemReduceFn; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.InMemoryStateInternals; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +/** + * Apex operator for Beam {@link GroupByKey}. + * This operator expects the input stream already partitioned by K, + * which is determined by the {@link StreamCodec} on the input port. + * + * @param <K> + * @param <V> + */ +public class ApexGroupByKeyOperator<K, V> implements Operator +{ + @Bind(JavaSerializer.class) + private WindowingStrategy<V, BoundedWindow> windowingStrategy; + @Bind(JavaSerializer.class) + private Coder<V> valueCoder; + + @Bind(JavaSerializer.class) + private final SerializablePipelineOptions serializedOptions; + @Bind(JavaSerializer.class) + private Map<K, StateInternals<K>> perKeyStateInternals = new HashMap<>(); + private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); + + private transient ProcessContext context; + private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn; + private transient ApexTimerInternals timerInternals = new ApexTimerInternals(); + private Instant inputWatermark = new Instant(0); + + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() + { + @Override + public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t) + { + //System.out.println("\n***RECEIVED: " +t); + try { + if (t instanceof ApexStreamTuple.WatermarkTuple) { + ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>)t; + processWatermark(mark); + output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(mark.getTimestamp())); + return; + } + processElement(t.getValue()); + } catch (Exception e) { + Throwables.propagate(e); + } + } + }; + + @OutputPortFieldAnnotation(optional=true) + public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>> output = new DefaultOutputPort<>(); + + @SuppressWarnings("unchecked") + public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input) + { + Preconditions.checkNotNull(pipelineOptions); + this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); + this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>)input.getWindowingStrategy(); + this.valueCoder = ((KvCoder<K, V>)input.getCoder()).getValueCoder(); + } + + @SuppressWarnings("unused") // for Kryo + private ApexGroupByKeyOperator() + { + this.serializedOptions = null; + } + + @Override + public void beginWindow(long l) + { + } + + @Override + public void endWindow() + { + } + + @Override + public void setup(OperatorContext context) + { + StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory(); + this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, stateInternalsFactory, + SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder)); + this.context = new ProcessContext(fn, this.timerInternals); + } + + @Override + public void teardown() + { + } + + /** + * Returns the list of timers that are ready to fire. These are the timers + * that are registered to be triggered at a time before the current watermark. + * We keep these timers in a Set, so that they are deduplicated, as the same + * timer can be registered multiple times. + */ + private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) { + + // we keep the timers to return in a different list and launch them later + // because we cannot prevent a trigger from registering another trigger, + // which would lead to concurrent modification exception. + Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create(); + + Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); + + Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator(); + while (timerIt.hasNext()) { + TimerInternals.TimerData timerData = timerIt.next(); + if (timerData.getTimestamp().isBefore(currentWatermark)) { + toFire.put(keyWithTimers.getKey(), timerData); + timerIt.remove(); + } + } + + if (keyWithTimers.getValue().isEmpty()) { + it.remove(); + } + } + return toFire; + } + + private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception { + final KV<K, V> kv = windowedValue.getValue(); + final WindowedValue<V> updatedWindowedValue = WindowedValue.of(kv.getValue(), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPane()); + + KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem( + kv.getKey(), + Collections.singletonList(updatedWindowedValue)); + + context.setElement(kwi, getStateInternalsForKey(kwi.key())); + fn.processElement(context); + } + + private StateInternals<K> getStateInternalsForKey(K key) { + StateInternals<K> stateInternals = perKeyStateInternals.get(key); + if (stateInternals == null) { + //Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); + //OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getOutputTimeFn(); + stateInternals = InMemoryStateInternals.forKey(key); + perKeyStateInternals.put(key, stateInternals); + } + return stateInternals; + } + + private void registerActiveTimer(K key, TimerInternals.TimerData timer) { + Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); + if (timersForKey == null) { + timersForKey = new HashSet<>(); + } + timersForKey.add(timer); + activeTimers.put(key, timersForKey); + } + + private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) { + Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); + if (timersForKey != null) { + timersForKey.remove(timer); + if (timersForKey.isEmpty()) { + activeTimers.remove(key); + } else { + activeTimers.put(key, timersForKey); + } + } + } + + private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception { + this.inputWatermark = new Instant(mark.getTimestamp()); + Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp()); + if (!timers.isEmpty()) { + for (K key : timers.keySet()) { + KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(key)); + context.setElement(kwi, getStateInternalsForKey(kwi.key())); + fn.processElement(context); + } + } + } + + private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?, KeyedWorkItem<K, V>>.ProcessContext { + + private final ApexTimerInternals timerInternals; + private StateInternals<K> stateInternals; + private KeyedWorkItem<K, V> element; + + public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function, + ApexTimerInternals timerInternals) { + function.super(); + this.timerInternals = Preconditions.checkNotNull(timerInternals); + } + + public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) { + this.element = element; + this.stateInternals = stateForKey; + } + + @Override + public KeyedWorkItem<K, V> element() { + return this.element; + } + + @Override + public Instant timestamp() { + throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems."); + } + + @Override + public PipelineOptions getPipelineOptions() { + return serializedOptions.get(); + } + + @Override + public void output(KV<K, Iterable<V>> output) { + throw new UnsupportedOperationException( + "output() is not available when processing KeyedWorkItems."); + } + + @Override + public void outputWithTimestamp(KV<K, Iterable<V>> output, Instant timestamp) { + throw new UnsupportedOperationException( + "outputWithTimestamp() is not available when processing KeyedWorkItems."); + } + + @Override + public PaneInfo pane() { + throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems."); + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException( + "window() is not available when processing KeyedWorkItems."); + } + + @Override + public WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> windowingInternals() { + return new WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>() { + + @Override + public StateInternals<K> stateInternals() { + return stateInternals; + } + + @Override + public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + System.out.println("\n***EMITTING: " + output + ", timestamp=" + timestamp); + ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane))); + } + + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException("windows() is not available in Streaming mode."); + } + + @Override + public PaneInfo pane() { + throw new UnsupportedOperationException("pane() is not available in Streaming mode."); + } + + @Override + public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + throw new RuntimeException("writePCollectionViewData() not available in Streaming mode."); + } + + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + throw new RuntimeException("sideInput() is not available in Streaming mode."); + } + }; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + throw new RuntimeException("sideInput() is not supported in Streaming mode."); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + // ignore the side output, this can happen when a user does not register + // side outputs but then outputs using a freshly created TupleTag. + throw new RuntimeException("sideOutput() is not available when grouping by window."); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + sideOutput(tag, output); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + throw new UnsupportedOperationException(); + } + } + + /** + * An implementation of Beam's {@link TimerInternals}. + * + */ + public class ApexTimerInternals implements TimerInternals { + + @Override + public void setTimer(TimerData timerKey) + { + registerActiveTimer(context.element().key(), timerKey); + } + + @Override + public void deleteTimer(TimerData timerKey) + { + unregisterActiveTimer(context.element().key(), timerKey); + } + + @Override + public Instant currentProcessingTime() + { + return Instant.now(); + } + + @Override + public Instant currentSynchronizedProcessingTime() + { + // TODO Auto-generated method stub + return null; + } + + @Override + public Instant currentInputWatermarkTime() + { + return inputWatermark; + } + + @Override + public Instant currentOutputWatermarkTime() + { + // TODO Auto-generated method stub + return null; + } + + } + + private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable + { + @Override + public StateInternals<K> stateInternalsForKey(K key) + { + return getStateInternalsForKey(key); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java new file mode 100644 index 0000000..8005832 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators.functions; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; +import org.apache.beam.runners.apex.translators.utils.NoOpStepContext; +import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.util.DoFnRunner; +import org.apache.beam.sdk.util.DoFnRunners; +import org.apache.beam.sdk.util.DoFnRunners.OutputManager; +import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.esotericsoftware.kryo.serializers.JavaSerializer; + +/** + * Apex operator for Beam {@link DoFn}. + */ +public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager { + + private transient final TupleTag<OutputT> mainTag = new TupleTag<OutputT>(); + private transient DoFnRunner<InputT, OutputT> doFnRunner; + + @Bind(JavaSerializer.class) + private final SerializablePipelineOptions pipelineOptions; + @Bind(JavaSerializer.class) + private final OldDoFn<InputT, OutputT> doFn; + @Bind(JavaSerializer.class) + private final WindowingStrategy<?, ?> windowingStrategy; + @Bind(JavaSerializer.class) + private final SideInputReader sideInputReader; + + public ApexParDoOperator( + ApexPipelineOptions pipelineOptions, + OldDoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + SideInputReader sideInputReader) { + this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions); + this.doFn = doFn; + this.windowingStrategy = windowingStrategy; + this.sideInputReader = sideInputReader; + } + + @SuppressWarnings("unused") // for Kryo + private ApexParDoOperator() { + this(null, null, null, null); + } + + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() + { + @Override + public void process(ApexStreamTuple<WindowedValue<InputT>> t) + { + if (t instanceof ApexStreamTuple.WatermarkTuple) { + output.emit(t); + } else { + System.out.println("\n" + Thread.currentThread().getName() + "\n" + t.getValue() + "\n"); + doFnRunner.processElement(t.getValue()); + } + } + }; + + @OutputPortFieldAnnotation(optional=true) + public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>(); + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) + { + output.emit(ApexStreamTuple.DataTuple.of(tuple)); + } + + @Override + public void setup(OperatorContext context) + { + this.doFnRunner = DoFnRunners.simpleRunner(pipelineOptions.get(), + doFn, + sideInputReader, + this, + mainTag, + TupleTagList.empty().getAll(), + new NoOpStepContext(), + new NoOpAggregatorFactory(), + windowingStrategy + ); + } + + @Override + public void beginWindow(long windowId) + { + doFnRunner.startBundle(); + /* + Collection<Aggregator<?, ?>> aggregators = AggregatorRetriever.getAggregators(doFn); + if (!aggregators.isEmpty()) { + System.out.println("\n" + Thread.currentThread().getName() + "\n" +AggregatorRetriever.getAggregators(doFn) + "\n"); + } + */ + } + + @Override + public void endWindow() + { + doFnRunner.finishBundle(); + } + + /** + * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode. + * It is called from {@link org.apache.beam.sdk.util.SimpleDoFnRunner}. + */ + public class NoOpAggregatorFactory implements AggregatorFactory { + + private NoOpAggregatorFactory() { + } + + @Override + public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( + Class<?> fnClass, ExecutionContext.StepContext step, + String name, CombineFn<InputT, AccumT, OutputT> combine) { + return new Aggregator<InputT, OutputT>() { + + @Override + public void addValue(InputT value) + { + } + + @Override + public String getName() + { + // TODO Auto-generated method stub + return null; + } + + @Override + public CombineFn<InputT, ?, OutputT> getCombineFn() + { + // TODO Auto-generated method stub + return null; + } + + }; + } + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java new file mode 100644 index 0000000..39114fe --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators.io; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; +import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.DataTuple; +import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; + +import org.joda.time.Instant; + +import com.datatorrent.api.Context.OperatorContext; +import com.google.common.base.Throwables; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.esotericsoftware.kryo.serializers.JavaSerializer; + +import java.io.IOException; + +/** + * Apex input operator that wraps Beam UnboundedSource. + */ +public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> + implements InputOperator { + + @Bind(JavaSerializer.class) + private final SerializablePipelineOptions pipelineOptions; + @Bind(JavaSerializer.class) + private final UnboundedSource<OutputT, CheckpointMarkT> source; + private transient UnboundedSource.UnboundedReader<OutputT> reader; + private transient boolean available = false; + public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output = new DefaultOutputPort<>(); + + public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, ApexPipelineOptions options) { + this.pipelineOptions = new SerializablePipelineOptions(options); + this.source = source; + } + + @SuppressWarnings("unused") // for Kryo + private ApexReadUnboundedInputOperator() { + this.pipelineOptions = null; this.source = null; + } + + @Override + public void beginWindow(long windowId) + { + Instant mark = reader.getWatermark(); + output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark.getMillis())); + if (!available && source instanceof ValuesSource) { + // if it's a Create transformation and the input was consumed, + // terminate the stream (allows tests to finish faster) + BaseOperator.shutdown(); + } + } + + @Override + public void endWindow() + { + } + + @Override + public void setup(OperatorContext context) + { + try { + reader = source.createReader(this.pipelineOptions.get(), null); + available = reader.start(); + } catch (IOException e) { + Throwables.propagate(e); + } + } + + @Override + public void teardown() + { + try { + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + Throwables.propagate(e); + } + } + + @Override + public void emitTuples() + { + try { + if (!available) { + available = reader.advance(); + } + if (available) { + OutputT data = reader.getCurrent(); + Instant timestamp = reader.getCurrentTimestamp(); + available = reader.advance(); + output.emit(DataTuple.of(WindowedValue.of( + data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING))); + } + } catch (Exception e) { + Throwables.propagate(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java new file mode 100644 index 0000000..2c4b298 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators.io; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; + +import org.joda.time.Instant; + +import com.google.common.base.Throwables; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +/** + * unbounded source that reads from a Java {@link Iterable}. + */ +public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> { + private static final long serialVersionUID = 1L; + + private final byte[] codedValues; + private final IterableCoder<T> iterableCoder; + + public ValuesSource(Iterable<T> values, Coder<T> coder) { + this.iterableCoder = IterableCoder.of(coder); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + iterableCoder.encode(values, bos, Context.OUTER); + } catch (IOException ex) { + Throwables.propagate(ex); + } + this.codedValues = bos.toByteArray(); + } + + @Override + public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + return Collections.singletonList(this); + } + + @Override + public UnboundedReader<T> createReader(PipelineOptions options, + @Nullable CheckpointMark checkpointMark) { + ByteArrayInputStream bis = new ByteArrayInputStream(codedValues); + try { + Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER); + return new ValuesReader<>(values, this); + } catch (IOException ex) { + throw Throwables.propagate(ex); + } + } + + @Nullable + @Override + public Coder<CheckpointMark> getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() { + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return iterableCoder.getElemCoder(); + } + + private static class ValuesReader<T> extends UnboundedReader<T> { + + private final Iterable<T> values; + private final UnboundedSource<T, CheckpointMark> source; + private transient Iterator<T> iterator; + private T current; + + public ValuesReader(Iterable<T> values, UnboundedSource<T, CheckpointMark> source) { + this.values = values; + this.source = source; + } + + @Override + public boolean start() throws IOException { + if (null == iterator) { + iterator = values.iterator(); + } + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (iterator.hasNext()) { + current = iterator.next(); + return true; + } else { + return false; + } + } + + @Override + public T getCurrent() throws NoSuchElementException { + return current; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return Instant.now(); + } + + @Override + public void close() throws IOException { + } + + @Override + public Instant getWatermark() { + return Instant.now(); + } + + @Override + public CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource<T, ?> getCurrentSource() { + return source; + } + } +}