http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java new file mode 100644 index 0000000..ecb0adb --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the Beam runner for Apache Apex. + */ +package org.apache.beam.runners.apex.translators.functions;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java index 6ee82ea..3188dfa 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java @@ -18,6 +18,17 @@ package org.apache.beam.runners.apex.translators.io; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Throwables; + +import java.io.IOException; + import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.DataTuple; @@ -26,27 +37,15 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; - import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.google.common.base.Throwables; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.common.util.BaseOperator; -import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; -import com.esotericsoftware.kryo.serializers.JavaSerializer; - -import java.io.IOException; - /** * Apex input operator that wraps Beam {@link UnboundedSource}. */ -public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> - implements InputOperator { +public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT + extends UnboundedSource.CheckpointMark> implements InputOperator { private static final Logger LOG = LoggerFactory.getLogger( ApexReadUnboundedInputOperator.class); private boolean traceTuples = false; @@ -58,10 +57,12 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb private final UnboundedSource<OutputT, CheckpointMarkT> source; private transient UnboundedSource.UnboundedReader<OutputT> reader; private transient boolean available = false; - @OutputPortFieldAnnotation(optional=true) - public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output = new DefaultOutputPort<>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output = + new DefaultOutputPort<>(); - public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, ApexPipelineOptions options) { + public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, + ApexPipelineOptions options) { this.pipelineOptions = new SerializablePipelineOptions(options); this.source = source; } @@ -72,8 +73,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb } @Override - public void beginWindow(long windowId) - { + public void beginWindow(long windowId) { if (!available && source instanceof ValuesSource) { // if it's a Create and the input was consumed, emit final watermark emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis()); @@ -95,37 +95,33 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb } @Override - public void endWindow() - { + public void endWindow() { } @Override - public void setup(OperatorContext context) - { + public void setup(OperatorContext context) { this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this); try { reader = source.createReader(this.pipelineOptions.get(), null); available = reader.start(); } catch (IOException e) { - Throwables.propagate(e); + throw new RuntimeException(e); } } @Override - public void teardown() - { + public void teardown() { try { if (reader != null) { reader.close(); } } catch (IOException e) { - Throwables.propagate(e); + throw new RuntimeException(e); } } @Override - public void emitTuples() - { + public void emitTuples() { try { if (!available) { available = reader.advance(); @@ -141,7 +137,8 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING))); } } catch (Exception e) { - Throwables.propagate(e); + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java index 2c4b298..fadf8ec4 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java @@ -18,16 +18,6 @@ package org.apache.beam.runners.apex.translators.io; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.Coder.Context; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; - -import org.joda.time.Instant; - -import com.google.common.base.Throwables; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -37,8 +27,15 @@ import java.util.NoSuchElementException; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.joda.time.Instant; + /** - * unbounded source that reads from a Java {@link Iterable}. + * Unbounded source that reads from a Java {@link Iterable}. */ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> { private static final long serialVersionUID = 1L; @@ -52,7 +49,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi try { iterableCoder.encode(values, bos, Context.OUTER); } catch (IOException ex) { - Throwables.propagate(ex); + throw new RuntimeException(ex); } this.codedValues = bos.toByteArray(); } @@ -71,7 +68,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER); return new ValuesReader<>(values, this); } catch (IOException ex) { - throw Throwables.propagate(ex); + throw new RuntimeException(ex); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java new file mode 100644 index 0000000..0d17f19 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the Beam runner for Apache Apex. + */ +package org.apache.beam.runners.apex.translators.io; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java new file mode 100644 index 0000000..7d7c6cc --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the Beam runner for Apache Apex. + */ +package org.apache.beam.runners.apex.translators; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java index c9bf6dc..a260a66 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.apex.translators.utils; import static com.google.common.base.Preconditions.checkNotNull; +import com.datatorrent.api.Operator; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -32,24 +34,25 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StandardCoder; -import com.datatorrent.api.Operator; - -public interface ApexStreamTuple<T> -{ +/** + * The common interface for all objects transmitted through streams. + * + * @param <T> The actual payload type. + */ +public interface ApexStreamTuple<T> { /** - * Gets the value of the tuple + * Gets the value of the tuple. * * @return */ T getValue(); /** - * Plain tuple class + * Data tuple class. * * @param <T> */ - class DataTuple<T> implements ApexStreamTuple<T> - { + class DataTuple<T> implements ApexStreamTuple<T> { private int unionTag; private T value; @@ -57,86 +60,73 @@ public interface ApexStreamTuple<T> return new DataTuple<>(value, 0); } - private DataTuple(T value, int unionTag) - { + private DataTuple(T value, int unionTag) { this.value = value; this.unionTag = unionTag; } @Override - public T getValue() - { + public T getValue() { return value; } - public void setValue(T value) - { + public void setValue(T value) { this.value = value; } - public int getUnionTag() - { + public int getUnionTag() { return unionTag; } - public void setUnionTag(int unionTag) - { + public void setUnionTag(int unionTag) { this.unionTag = unionTag; } @Override - public String toString() - { + public String toString() { return value.toString(); } } /** - * Tuple that includes a timestamp + * Tuple that includes a timestamp. * * @param <T> */ - class TimestampedTuple<T> extends DataTuple<T> - { + class TimestampedTuple<T> extends DataTuple<T> { private long timestamp; - public TimestampedTuple(long timestamp, T value) - { + public TimestampedTuple(long timestamp, T value) { super(value, 0); this.timestamp = timestamp; } - public long getTimestamp() - { + public long getTimestamp() { return timestamp; } - public void setTimestamp(long timestamp) - { + public void setTimestamp(long timestamp) { this.timestamp = timestamp; } } /** - * Tuple that represents a watermark + * Tuple that represents a watermark. * * @param <T> */ - class WatermarkTuple<T> extends TimestampedTuple<T> - { + class WatermarkTuple<T> extends TimestampedTuple<T> { public static <T> WatermarkTuple<T> of(long timestamp) { return new WatermarkTuple<>(timestamp); } - protected WatermarkTuple(long timestamp) - { + protected WatermarkTuple(long timestamp) { super(timestamp, null); } @Override - public String toString() - { + public String toString() { return "[Watermark " + getTimestamp() + "]"; } } @@ -161,18 +151,17 @@ public interface ApexStreamTuple<T> throws CoderException, IOException { if (value instanceof WatermarkTuple) { outStream.write(1); - new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>)value).getTimestamp()); + new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>) value).getTimestamp()); } else { outStream.write(0); - outStream.write(((DataTuple<?>)value).unionTag); + outStream.write(((DataTuple<?>) value).unionTag); valueCoder.encode(value.getValue(), outStream, context); } } @Override public ApexStreamTuple<T> decode(InputStream inStream, Context context) - throws CoderException, IOException - { + throws CoderException, IOException { int b = inStream.read(); if (b == 1) { return new WatermarkTuple<T>(new DataInputStream(inStream).readLong()); @@ -183,14 +172,12 @@ public interface ApexStreamTuple<T> } @Override - public List<? extends Coder<?>> getCoderArguments() - { + public List<? extends Coder<?>> getCoderArguments() { return Arrays.<Coder<?>>asList(valueCoder); } @Override - public void verifyDeterministic() throws NonDeterministicException - { + public void verifyDeterministic() throws NonDeterministicException { verifyDeterministic( this.getClass().getSimpleName() + " requires a deterministic valueCoder", valueCoder); @@ -205,10 +192,12 @@ public interface ApexStreamTuple<T> } - final class Logging - { - public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator) - { + /** + * Central if data tuples received on and emitted from ports should be logged. + * Should be called in setup and value cached in operator. + */ + final class Logging { + public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator) { return options.isTupleTracingEnabled(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java index c18765b..61e3b83 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.apex.translators.utils; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.netlet.util.Slice; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -25,15 +28,10 @@ import java.io.Serializable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; -import com.datatorrent.api.StreamCodec; -import com.datatorrent.netlet.util.Slice; -import com.google.common.base.Throwables; - /** * The Apex {@link StreamCodec} adapter for using Beam {@link Coder}. */ public class CoderAdapterStreamCodec implements StreamCodec<Object>, Serializable { - private static final long serialVersionUID = 1L; private final Coder<? super Object> coder; @@ -42,31 +40,29 @@ public class CoderAdapterStreamCodec implements StreamCodec<Object>, Serializabl } @Override - public Object fromByteArray(Slice fragment) - { - ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length); + public Object fromByteArray(Slice fragment) { + ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset, + fragment.length); try { return coder.decode(bis, Context.OUTER); } catch (IOException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @Override - public Slice toByteArray(Object wv) - { + public Slice toByteArray(Object wv) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { coder.encode(wv, bos, Context.OUTER); } catch (IOException e) { - Throwables.propagate(e); + throw new RuntimeException(e); } return new Slice(bos.toByteArray()); } @Override - public int getPartition(Object o) - { + public int getPartition(Object o) { return o.hashCode(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java index 43d92f6..3b19c37 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.apex.translators.utils; +import java.io.IOException; +import java.io.Serializable; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ExecutionContext; @@ -25,14 +28,10 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; -import java.io.IOException; -import java.io.Serializable; - /** * Serializable {@link ExecutionContext.StepContext} that does nothing. */ public class NoOpStepContext implements ExecutionContext.StepContext, Serializable { - private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java index 7f7b3ef..d32b869 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.apex.translators.utils; +import com.fasterxml.jackson.databind.ObjectMapper; + import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -25,37 +27,34 @@ import java.io.ObjectOutput; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; -import com.fasterxml.jackson.databind.ObjectMapper; - /** - * A wrapper to enable serialization of {@link PipelineOptions} + * A wrapper to enable serialization of {@link PipelineOptions}. */ public class SerializablePipelineOptions implements Externalizable { private transient ApexPipelineOptions pipelineOptions; - + public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) { this.pipelineOptions = pipelineOptions; } public SerializablePipelineOptions() { } - + public ApexPipelineOptions get() { return this.pipelineOptions; } - + @Override - public void writeExternal(ObjectOutput out) throws IOException - { + public void writeExternal(ObjectOutput out) throws IOException { out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions)); } @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException - { + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { String s = in.readUTF(); - this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class).as(ApexPipelineOptions.class); + this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class) + .as(ApexPipelineOptions.class); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java index 2de737d..c06c500 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java @@ -17,26 +17,24 @@ */ package org.apache.beam.runners.apex.translators.utils; -import java.io.IOException; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; - import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoSerializable; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.base.Throwables; + +import java.io.IOException; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; /** * A {@link KryoSerializable} holder that uses the specified {@link Coder}. * @param <T> */ -public class ValueAndCoderKryoSerializable<T> implements KryoSerializable -{ - private static JavaSerializer JAVA_SERIALIZER = new JavaSerializer(); +public class ValueAndCoderKryoSerializable<T> implements KryoSerializable { + private static final JavaSerializer JAVA_SERIALIZER = new JavaSerializer(); private T value; private Coder<T> coder; @@ -54,27 +52,25 @@ public class ValueAndCoderKryoSerializable<T> implements KryoSerializable } @Override - public void write(Kryo kryo, Output output) - { + public void write(Kryo kryo, Output output) { try { kryo.writeClass(output, coder.getClass()); kryo.writeObject(output, coder, JAVA_SERIALIZER); coder.encode(value, output, Context.OUTER); } catch (IOException e) { - Throwables.propagate(e); + throw new RuntimeException(e); } } @Override - public void read(Kryo kryo, Input input) - { + public void read(Kryo kryo, Input input) { try { @SuppressWarnings("unchecked") Class<Coder<T>> type = kryo.readClass(input).getType(); coder = kryo.readObject(input, type, JAVA_SERIALIZER); value = coder.decode(input, Context.OUTER); } catch (IOException e) { - Throwables.propagate(e); + throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java new file mode 100644 index 0000000..4aeba35 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the Beam runner for Apache Apex. + */ +package org.apache.beam.runners.apex.translators.utils; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java deleted file mode 100644 index 3573d31..0000000 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.examples; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.TestApexRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.dataflow.TestCountingSource; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * For debugging only. - */ -@Ignore -@RunWith(JUnit4.class) -public class IntTest implements java.io.Serializable -{ - - @Test - public void test() - { - ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); - options.setTupleTracingEnabled(true); - options.setRunner(TestApexRunner.class); - Pipeline p = Pipeline.create(options); -boolean timeBound = false; - - - TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE).withoutSplitting(); -//List<KV<Integer,Integer>> values = Lists.newArrayList( -// KV.of(0, 99),KV.of(0, 99),KV.of(0, 98)); - -//UnboundedSource<KV<Integer,Integer>, ?> source = new ValuesSource<>(values, -// KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); - - if (true) { - source = source.withDedup(); - } - - PCollection<KV<Integer, Integer>> output = - timeBound - ? p.apply(Read.from(source).withMaxReadTime(Duration.millis(200))) - : p.apply(Read.from(source).withMaxNumRecords(NUM_RECORDS)); - - List<KV<Integer, Integer>> expectedOutput = new ArrayList<>(); - for (int i = 0; i < NUM_RECORDS; i++) { - expectedOutput.add(KV.of(0, i)); - } - - // Because some of the NUM_RECORDS elements read are dupes, the final output - // will only have output from 0 to n where n < NUM_RECORDS. - PAssert.that(output).satisfies(new Checker(true, timeBound)); - - - p.run(); - return; - } - - private static final int NUM_RECORDS = 10; - private static class Checker implements SerializableFunction<Iterable<KV<Integer, Integer>>, Void> - { - private final boolean dedup; - private final boolean timeBound; - - Checker(boolean dedup, boolean timeBound) - { - this.dedup = dedup; - this.timeBound = timeBound; - } - - @Override - public Void apply(Iterable<KV<Integer, Integer>> input) - { - List<Integer> values = new ArrayList<>(); - for (KV<Integer, Integer> kv : input) { - assertEquals(0, (int)kv.getKey()); - values.add(kv.getValue()); - } - if (timeBound) { - assertTrue(values.size() >= 1); - } else if (dedup) { - // Verify that at least some data came through. The chance of 90% of the input - // being duplicates is essentially zero. - assertTrue(values.size() > NUM_RECORDS / 10 && values.size() <= NUM_RECORDS); - } else { - assertEquals(NUM_RECORDS, values.size()); - } - Collections.sort(values); - for (int i = 0; i < values.size(); i++) { - assertEquals(i, (int)values.get(i)); - } - //if (finalizeTracker != null) { - // assertThat(finalizeTracker, containsInAnyOrder(values.size() - 1)); - //} - return null; - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java index 582d839..6ab2e8e 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java @@ -21,8 +21,8 @@ package org.apache.beam.runners.apex.examples; import java.util.concurrent.ConcurrentHashMap; import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.ApexRunnerResult; import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.runners.apex.ApexRunnerResult; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -37,7 +37,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,8 +75,8 @@ public class StreamingWordCountTest { @ProcessElement public void processElement(ProcessContext c) { - String row = c.element().getKey() + " - " + c.element().getValue() + - " @ " + c.timestamp().toString(); + String row = c.element().getKey() + " - " + c.element().getValue() + + " @ " + c.timestamp().toString(); LOG.debug("output {}", row); c.output(row); RESULTS.put(c.element().getKey(), c.element().getValue()); @@ -103,17 +102,19 @@ public class StreamingWordCountTest { wordCounts.apply(ParDo.of(new FormatAsStringFn())); - ApexRunnerResult result = (ApexRunnerResult)p.run(); + ApexRunnerResult result = (ApexRunnerResult) p.run(); Assert.assertNotNull(result.getApexDAG().getOperatorMeta("Read(UnboundedTextSource)")); long timeout = System.currentTimeMillis() + 30000; while (System.currentTimeMillis() < timeout) { - if (FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar")) { + if (FormatAsStringFn.RESULTS.containsKey("foo") + && FormatAsStringFn.RESULTS.containsKey("bar")) { break; } Thread.sleep(1000); } result.cancel(); - Assert.assertTrue(FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar")); + Assert.assertTrue( + FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar")); FormatAsStringFn.RESULTS.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java index 29351e9..8132ee5 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java @@ -18,14 +18,6 @@ package org.apache.beam.runners.apex.examples; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.joda.time.Instant; - -import com.google.common.base.Throwables; - import java.io.IOException; import java.io.Serializable; import java.util.Collections; @@ -34,6 +26,12 @@ import java.util.NoSuchElementException; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.joda.time.Instant; + /** * unbounded source that reads from text. */ @@ -102,7 +100,7 @@ public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource try { Thread.sleep(index); // allow for downstream processing to complete } catch (InterruptedException e) { - Throwables.propagate(e); + throw new RuntimeException(e); } return true; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/package-info.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/package-info.java new file mode 100644 index 0000000..4308c80 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the Beam runner for Apache Apex. + */ +package org.apache.beam.runners.apex.examples; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java index 6b181ba..7defc77 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java @@ -18,9 +18,16 @@ package org.apache.beam.runners.apex.translators; +import com.google.common.collect.Sets; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.ApexRunnerResult; import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.runners.apex.ApexRunnerResult; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -30,21 +37,13 @@ import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; - -import com.google.common.collect.Sets; - import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - /** - * integration test for {@link FlattenPCollectionTranslator}. + * Integration test for {@link FlattenPCollectionTranslator}. */ public class FlattenPCollectionTranslatorTest { private static final Logger LOG = LoggerFactory.getLogger(FlattenPCollectionTranslatorTest.class); @@ -70,29 +69,30 @@ public class FlattenPCollectionTranslatorTest { PCollection<String> actual = PCollectionList.of(pcList).apply(Flatten.<String>pCollections()); actual.apply(ParDo.of(new EmbeddedCollector())); - ApexRunnerResult result = (ApexRunnerResult)p.run(); + ApexRunnerResult result = (ApexRunnerResult) p.run(); // TODO: verify translation result.getApexDAG(); long timeout = System.currentTimeMillis() + 30000; - while (System.currentTimeMillis() < timeout && EmbeddedCollector.results.size() < expected.size()) { + while (System.currentTimeMillis() < timeout + && EmbeddedCollector.RESULTS.size() < expected.size()) { LOG.info("Waiting for expected results."); Thread.sleep(500); } - Assert.assertEquals("number results", expected.size(), EmbeddedCollector.results.size()); - Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.results)); + Assert.assertEquals("number results", expected.size(), EmbeddedCollector.RESULTS.size()); + Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.RESULTS)); } @SuppressWarnings("serial") private static class EmbeddedCollector extends OldDoFn<Object, Void> { - protected static final ArrayList<Object> results = new ArrayList<>(); + protected static final ArrayList<Object> RESULTS = new ArrayList<>(); public EmbeddedCollector() { } @Override public void processElement(ProcessContext c) throws Exception { - results.add(c.element()); + RESULTS.add(c.element()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java index e4d4606..cb764d6 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java @@ -18,9 +18,22 @@ package org.apache.beam.runners.apex.translators; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.ApexRunnerResult; import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.runners.apex.ApexRunnerResult; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -35,28 +48,13 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; - -import com.datatorrent.api.DAG; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Assert; import org.junit.Test; -import java.io.IOException; -import java.io.Serializable; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -import javax.annotation.Nullable; - /** - * integration test for {@link GroupByKeyTranslator}. + * Integration test for {@link GroupByKeyTranslator}. */ public class GroupByKeyTranslatorTest { @@ -94,31 +92,30 @@ public class GroupByKeyTranslatorTest { .apply(ParDo.of(new EmbeddedCollector())) ; - ApexRunnerResult result = (ApexRunnerResult)p.run(); - // TODO: verify translation - DAG dag = result.getApexDAG(); + ApexRunnerResult result = (ApexRunnerResult) p.run(); + result.getApexDAG(); long timeout = System.currentTimeMillis() + 30000; while (System.currentTimeMillis() < timeout) { - if (EmbeddedCollector.results.containsAll(expected)) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { break; } Thread.sleep(1000); } - Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results); + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); } @SuppressWarnings("serial") private static class EmbeddedCollector extends OldDoFn<Object, Void> { - protected static final HashSet<Object> results = new HashSet<>(); + protected static final HashSet<Object> RESULTS = new HashSet<>(); public EmbeddedCollector() { } @Override public void processElement(ProcessContext c) throws Exception { - results.add(c.element()); + RESULTS.add(c.element()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java index b9748ee..ad22acd 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java @@ -21,6 +21,11 @@ package org.apache.beam.runners.apex.translators; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -56,11 +61,6 @@ import org.junit.runners.JUnit4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.DAG; -import com.datatorrent.lib.util.KryoCloneUtils; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - /** * integration test for {@link ParDoBoundTranslator}. */ @@ -83,7 +83,7 @@ public class ParDoBoundTranslatorTest { .apply(ParDo.of(new Add(5))) .apply(ParDo.of(new EmbeddedCollector())); - ApexRunnerResult result = (ApexRunnerResult)p.run(); + ApexRunnerResult result = (ApexRunnerResult) p.run(); DAG dag = result.getApexDAG(); DAG.OperatorMeta om = dag.getOperatorMeta("Create.Values"); @@ -96,13 +96,13 @@ public class ParDoBoundTranslatorTest { long timeout = System.currentTimeMillis() + 30000; while (System.currentTimeMillis() < timeout) { - if (EmbeddedCollector.results.containsAll(expected)) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { break; } LOG.info("Waiting for expected results."); Thread.sleep(1000); } - Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results); + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); } @SuppressWarnings("serial") @@ -121,14 +121,14 @@ public class ParDoBoundTranslatorTest { @SuppressWarnings("serial") private static class EmbeddedCollector extends OldDoFn<Object, Void> { - protected static final HashSet<Object> results = new HashSet<>(); + protected static final HashSet<Object> RESULTS = new HashSet<>(); public EmbeddedCollector() { } @Override public void processElement(ProcessContext c) throws Exception { - results.add(c.element()); + RESULTS.add(c.element()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java index f954537..71c5354 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java @@ -18,9 +18,20 @@ package org.apache.beam.runners.apex.translators; +import com.datatorrent.api.DAG; +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.DiscreteDomain; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.ApexRunnerResult; import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.runners.apex.ApexRunnerResult; import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; import org.apache.beam.runners.apex.translators.utils.CollectionSource; import org.apache.beam.sdk.Pipeline; @@ -30,23 +41,11 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; - -import com.datatorrent.api.DAG; -import com.google.common.collect.ContiguousSet; -import com.google.common.collect.DiscreteDomain; -import com.google.common.collect.Lists; -import com.google.common.collect.Range; -import com.google.common.collect.Sets; - import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - /** * integration test for {@link ReadUnboundedTranslator}. */ @@ -57,7 +56,7 @@ public class ReadUnboundTranslatorTest { public void test() throws Exception { ApexPipelineOptions options = PipelineOptionsFactory.create() .as(ApexPipelineOptions.class); - EmbeddedCollector.results.clear(); + EmbeddedCollector.RESULTS.clear(); options.setApplicationName("ReadUnbound"); options.setRunner(ApexRunner.class); Pipeline p = Pipeline.create(options); @@ -67,7 +66,7 @@ public class ReadUnboundTranslatorTest { p.apply(Read.from(source)) .apply(ParDo.of(new EmbeddedCollector())); - ApexRunnerResult result = (ApexRunnerResult)p.run(); + ApexRunnerResult result = (ApexRunnerResult) p.run(); DAG dag = result.getApexDAG(); DAG.OperatorMeta om = dag.getOperatorMeta("Read(CollectionSource)"); Assert.assertNotNull(om); @@ -75,20 +74,20 @@ public class ReadUnboundTranslatorTest { long timeout = System.currentTimeMillis() + 30000; while (System.currentTimeMillis() < timeout) { - if (EmbeddedCollector.results.containsAll(collection)) { + if (EmbeddedCollector.RESULTS.containsAll(collection)) { break; } LOG.info("Waiting for expected results."); Thread.sleep(1000); } - Assert.assertEquals(Sets.newHashSet(collection), EmbeddedCollector.results); + Assert.assertEquals(Sets.newHashSet(collection), EmbeddedCollector.RESULTS); } @Test public void testReadBounded() throws Exception { ApexPipelineOptions options = PipelineOptionsFactory.create() .as(ApexPipelineOptions.class); - EmbeddedCollector.results.clear(); + EmbeddedCollector.RESULTS.clear(); options.setApplicationName("ReadBounded"); options.setRunner(ApexRunner.class); Pipeline p = Pipeline.create(options); @@ -97,7 +96,7 @@ public class ReadUnboundTranslatorTest { p.apply(Read.from(CountingSource.upTo(10))) .apply(ParDo.of(new EmbeddedCollector())); - ApexRunnerResult result = (ApexRunnerResult)p.run(); + ApexRunnerResult result = (ApexRunnerResult) p.run(); DAG dag = result.getApexDAG(); DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)"); Assert.assertNotNull(om); @@ -105,25 +104,25 @@ public class ReadUnboundTranslatorTest { long timeout = System.currentTimeMillis() + 30000; while (System.currentTimeMillis() < timeout) { - if (EmbeddedCollector.results.containsAll(expected)) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { break; } LOG.info("Waiting for expected results."); Thread.sleep(1000); } - Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results); + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); } @SuppressWarnings("serial") private static class EmbeddedCollector extends OldDoFn<Object, Void> { - protected static final HashSet<Object> results = new HashSet<>(); + protected static final HashSet<Object> RESULTS = new HashSet<>(); public EmbeddedCollector() { } @Override public void processElement(ProcessContext c) throws Exception { - results.add(c.element()); + RESULTS.add(c.element()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java index a1e8b3e..c368bb2 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java @@ -18,12 +18,6 @@ package org.apache.beam.runners.apex.translators.utils; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; - -import org.joda.time.Instant; - import java.io.IOException; import java.io.Serializable; import java.util.Collection; @@ -34,11 +28,16 @@ import java.util.NoSuchElementException; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.joda.time.Instant; + /** * collection as {@link UnboundedSource}, used for tests. */ public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> { - + private static final long serialVersionUID = 1L; private final Collection<T> collection; private final Coder<T> coder; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java index e2fa9d9..e67efa9 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java @@ -17,29 +17,31 @@ */ package org.apache.beam.runners.apex.translators.utils; -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.BeforeClass; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import com.datatorrent.common.util.FSStorageAgent; -import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import com.esotericsoftware.kryo.serializers.JavaSerializer; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.BeforeClass; +import org.junit.Test; + /** * Tests the serialization of PipelineOptions. */ public class PipelineOptionsTest { + /** + * Interface for testing. + */ public interface MyOptions extends ApexPipelineOptions { @Description("Bla bla bla") @Default.String("Hello") @@ -60,7 +62,7 @@ public class PipelineOptionsTest { private static MyOptions options; - private final static String[] args = new String[]{"--testOption=nothing"}; + private static final String[] args = new String[]{"--testOption=nothing"}; @BeforeClass public static void beforeTest() { @@ -74,7 +76,7 @@ public class PipelineOptionsTest { FSStorageAgent.store(bos, wrapper); ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); - MyOptionsWrapper wrapperCopy = (MyOptionsWrapper)FSStorageAgent.retrieve(bis); + MyOptionsWrapper wrapperCopy = (MyOptionsWrapper) FSStorageAgent.retrieve(bis); assertNotNull(wrapperCopy.options); assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/resources/log4j.properties b/runners/apex/src/test/resources/log4j.properties index c0efc5d..d1e6b44 100644 --- a/runners/apex/src/test/resources/log4j.properties +++ b/runners/apex/src/test/resources/log4j.properties @@ -18,16 +18,18 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -log4j.rootLogger=DEBUG, testlogger +log4j.rootLogger=OFF, testlogger # A1 is set to be a ConsoleAppender. log4j.appender.testlogger=org.apache.log4j.ConsoleAppender log4j.appender.testlogger.target = System.err log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +log4j.appender.testlogger.threshold=${test.log.threshold} +test.log.threshold=DEBUG -log4j.logger.org=debug +log4j.logger.org=info log4j.logger.org.apache.commons.beanutils=warn log4j.logger.com.datatorrent=info -log4j.logger.org.apache.apex=debug +log4j.logger.org.apache.apex=info log4j.logger.org.apache.beam.runners.apex=debug