[03/39] incubator-beam git commit: BEAM-261 Use Apex 3.5.0-SNAPSHOT to use loopback as connect address.
BEAM-261 Use Apex 3.5.0-SNAPSHOT to use loopback as connect address. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a7e430d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a7e430d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a7e430d6 Branch: refs/heads/master Commit: a7e430d6b24de53a116258af75c7eb15d6133b4d Parents: aaf38dd Author: Thomas WeiseAuthored: Wed Aug 31 16:41:52 2016 -0700 Committer: Thomas Weise Committed: Sun Oct 16 23:22:59 2016 -0700 -- runners/apex/pom.xml | 6 +++--- .../beam/runners/apex/translators/CreateValuesTranslator.java | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7e430d6/runners/apex/pom.xml -- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index bb08b3c..21e53a8 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -35,9 +35,9 @@ jar -3.4.0 +3.5.0-SNAPSHOT 3.4.0 -true +false -Xmx2048m @@ -206,7 +206,7 @@ - org.apache.apex:apex-api:jar:3.4.0 + org.apache.apex:apex-api:jar:3.5.0-SNAPSHOT org.apache.commons:commons-lang3::3.1 com.esotericsoftware.kryo:kryo::2.24.0 com.datatorrent:netlet::1.2.1 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7e430d6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java -- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java index 387b19f..7a29057 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.apex.translators.io.ValuesSource; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PBegin; import com.google.common.base.Throwables; @@ -38,7 +39,7 @@ public class CreateValuesTranslator implements TransformTranslator transform, TranslationContext context) { try { UnboundedSource unboundedSource = new ValuesSource<>(transform.getElements(), - transform.getDefaultOutputCoder(context.getInput())); + transform.getDefaultOutputCoder((PBegin)context.getInput())); ApexReadUnboundedInputOperator operator = new ApexReadUnboundedInputOperator<>(unboundedSource, context.getPipelineOptions()); context.addOperator(operator, operator.output);
[39/39] incubator-beam git commit: Merge apex-runner to master. This closes #1305.
Merge apex-runner to master. This closes #1305. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7d069a65 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7d069a65 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7d069a65 Branch: refs/heads/master Commit: 7d069a65b4bb264bce279b19a1dc0f7372ce43fb Parents: e2c2159 41394c2 Author: Kenneth KnowlesAuthored: Fri Nov 11 18:19:34 2016 -0800 Committer: Kenneth Knowles Committed: Fri Nov 11 18:19:34 2016 -0800 -- examples/java/pom.xml | 35 ++ pom.xml | 6 + runners/apex/README.md | 76 +++ runners/apex/pom.xml| 234 + .../beam/runners/apex/ApexPipelineOptions.java | 60 +++ .../apache/beam/runners/apex/ApexRunner.java| 398 .../beam/runners/apex/ApexRunnerRegistrar.java | 61 +++ .../beam/runners/apex/ApexRunnerResult.java | 110 + .../beam/runners/apex/TestApexRunner.java | 73 +++ .../apache/beam/runners/apex/package-info.java | 22 + .../translation/ApexPipelineTranslator.java | 179 +++ .../translation/CreateValuesTranslator.java | 48 ++ .../FlattenPCollectionTranslator.java | 129 + .../apex/translation/GroupByKeyTranslator.java | 42 ++ .../translation/ParDoBoundMultiTranslator.java | 142 ++ .../apex/translation/ParDoBoundTranslator.java | 64 +++ .../translation/ReadUnboundedTranslator.java| 42 ++ .../apex/translation/TransformTranslator.java | 31 ++ .../apex/translation/TranslationContext.java| 178 +++ .../operators/ApexFlattenOperator.java | 125 + .../operators/ApexGroupByKeyOperator.java | 475 +++ .../operators/ApexParDoOperator.java| 375 +++ .../ApexReadUnboundedInputOperator.java | 155 ++ .../translation/operators/package-info.java | 22 + .../runners/apex/translation/package-info.java | 22 + .../translation/utils/ApexStateInternals.java | 438 + .../apex/translation/utils/ApexStreamTuple.java | 222 + .../utils/CoderAdapterStreamCodec.java | 69 +++ .../apex/translation/utils/NoOpStepContext.java | 72 +++ .../utils/SerializablePipelineOptions.java | 60 +++ .../utils/ValueAndCoderKryoSerializable.java| 77 +++ .../apex/translation/utils/ValuesSource.java| 149 ++ .../apex/translation/utils/package-info.java| 22 + .../runners/apex/ApexRunnerRegistrarTest.java | 47 ++ .../apex/examples/UnboundedTextSource.java | 142 ++ .../runners/apex/examples/WordCountTest.java| 188 .../runners/apex/examples/package-info.java | 22 + .../translation/ApexGroupByKeyOperatorTest.java | 117 + .../FlattenPCollectionTranslatorTest.java | 99 .../translation/GroupByKeyTranslatorTest.java | 246 ++ .../translation/ParDoBoundTranslatorTest.java | 340 + .../translation/ReadUnboundTranslatorTest.java | 129 + .../utils/ApexStateInternalsTest.java | 361 ++ .../translation/utils/CollectionSource.java | 136 ++ .../translation/utils/PipelineOptionsTest.java | 84 .../apex/src/test/resources/log4j.properties| 35 ++ runners/apex/src/test/resources/words.txt | 3 + runners/pom.xml | 1 + 48 files changed, 6163 insertions(+) --
[22/39] incubator-beam git commit: Merge branch 'master' into apex-runner
Merge branch 'master' into apex-runner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6fc47ed1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6fc47ed1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6fc47ed1 Branch: refs/heads/master Commit: 6fc47ed101f02aacacecd5c62c4a026eaba3e12c Parents: fa3a6aa 215980a Author: Thomas WeiseAuthored: Thu Oct 27 19:25:06 2016 -0700 Committer: Thomas Weise Committed: Thu Oct 27 19:25:06 2016 -0700 -- .travis.yml | 22 +- .../beam/examples/DebuggingWordCount.java | 2 +- .../apache/beam/examples/MinimalWordCount.java | 2 +- .../org/apache/beam/examples/WordCount.java | 2 +- .../apache/beam/examples/complete/TfIdf.java| 2 +- .../examples/complete/TopWikipediaSessions.java | 2 +- .../examples/cookbook/BigQueryTornadoes.java| 2 +- .../cookbook/CombinePerKeyExamples.java | 2 +- .../beam/examples/cookbook/DeDupExample.java| 2 +- .../beam/examples/cookbook/FilterExamples.java | 2 +- .../beam/examples/cookbook/JoinExamples.java| 2 +- .../examples/cookbook/MaxPerKeyExamples.java| 2 +- .../org/apache/beam/examples/WordCountTest.java | 2 +- .../examples/complete/AutoCompleteTest.java | 6 +- .../beam/examples/complete/TfIdfTest.java | 2 +- .../complete/TopWikipediaSessionsTest.java | 2 +- .../examples/cookbook/DeDupExampleTest.java | 4 +- .../examples/cookbook/JoinExamplesTest.java | 2 +- .../examples/cookbook/TriggerExampleTest.java | 2 +- .../beam/examples/MinimalWordCountJava8.java| 2 +- .../beam/examples/complete/game/GameStats.java | 52 +- .../examples/complete/game/HourlyTeamScore.java | 30 +- .../examples/complete/game/LeaderBoard.java | 43 +- .../beam/examples/complete/game/UserScore.java | 16 +- .../complete/game/utils/WriteToBigQuery.java| 49 +- .../game/utils/WriteWindowedToBigQuery.java | 14 +- .../examples/complete/game/GameStatsTest.java | 2 +- .../complete/game/HourlyTeamScoreTest.java | 2 +- .../examples/complete/game/LeaderBoardTest.java | 10 +- .../examples/complete/game/UserScoreTest.java | 6 +- pom.xml | 47 +- .../runners/core/GroupAlsoByWindowsDoFn.java| 19 - .../runners/direct/CloningBundleFactory.java| 98 .../beam/runners/direct/DirectRunner.java | 7 +- .../runners/direct/DoFnLifecycleManager.java| 56 +- .../GroupAlsoByWindowEvaluatorFactory.java | 8 +- .../direct/ImmutableListBundleFactory.java | 4 +- .../beam/runners/direct/WatermarkManager.java | 17 +- .../direct/WriteWithShardingFactory.java| 6 +- .../direct/CloningBundleFactoryTest.java| 177 +++ .../ConsumerTrackingPipelineVisitorTest.java| 32 +- .../beam/runners/direct/DirectRunnerTest.java | 40 +- .../direct/DoFnLifecycleManagerTest.java| 74 ++- .../EncodabilityEnforcementFactoryTest.java | 6 +- .../ImmutabilityCheckingBundleFactoryTest.java | 8 +- .../ImmutabilityEnforcementFactoryTest.java | 8 +- .../direct/KeyedPValueTrackingVisitorTest.java | 8 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 8 +- .../direct/ParDoMultiEvaluatorFactoryTest.java | 87 ++-- .../direct/ParDoSingleEvaluatorFactoryTest.java | 94 ++-- .../runners/direct/WatermarkManagerTest.java| 8 +- .../dataflow/BlockingDataflowRunner.java| 13 +- .../runners/dataflow/DataflowPipelineJob.java | 17 +- .../dataflow/DataflowPipelineTranslator.java| 4 + .../beam/runners/dataflow/DataflowRunner.java | 4 +- .../options/DataflowPipelineOptions.java| 12 + .../beam/runners/dataflow/util/DoFnInfo.java| 43 +- runners/spark/pom.xml | 66 ++- .../runners/spark/SparkPipelineOptions.java | 11 + .../apache/beam/runners/spark/SparkRunner.java | 19 - .../metrics/AggregatorMetricSource.java | 9 +- .../metrics/WithNamedAggregatorsSupport.java| 7 +- .../coders/BeamSparkRunnerRegistrator.java | 46 ++ .../runners/spark/io/EmptyCheckpointMark.java | 52 ++ .../apache/beam/runners/spark/io/KafkaIO.java | 131 - .../beam/runners/spark/io/MicrobatchSource.java | 262 ++ .../beam/runners/spark/io/SourceDStream.java| 156 ++ .../apache/beam/runners/spark/io/SourceRDD.java | 75 ++- .../runners/spark/io/SparkUnboundedSource.java | 167 ++ .../spark/stateful/StateSpecFunctions.java | 167 ++ .../runners/spark/stateful/package-info.java| 22 + .../spark/translation/EvaluationContext.java| 6 +- .../translation/GroupCombineFunctions.java | 66 +-- .../spark/translation/SparkContextFactory.java | 5 +-
[25/39] incubator-beam git commit: Closes #1227
Closes #1227 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51af7e59 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51af7e59 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51af7e59 Branch: refs/heads/master Commit: 51af7e592327ef711673a2b9828536051a6c3898 Parents: 968eb32 77f4ba2 Author: Thomas WeiseAuthored: Wed Nov 2 21:16:24 2016 -0700 Committer: Thomas Weise Committed: Wed Nov 2 21:16:24 2016 -0700 -- examples/java/pom.xml | 31 runners/apex/pom.xml| 2 +- .../beam/runners/apex/ApexPipelineOptions.java | 6 .../runners/apex/ApexPipelineTranslator.java| 2 +- .../apache/beam/runners/apex/ApexRunner.java| 12 .../beam/runners/apex/ApexRunnerResult.java | 27 +++-- .../beam/runners/apex/TestApexRunner.java | 20 +++-- .../io/ApexReadUnboundedInputOperator.java | 13 ++-- .../apex/examples/StreamingWordCountTest.java | 4 +-- .../translators/ParDoBoundTranslatorTest.java | 6 ++-- 10 files changed, 91 insertions(+), 32 deletions(-) --
[23/39] incubator-beam git commit: Adjust for merge from master.
Adjust for merge from master. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/968eb32b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/968eb32b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/968eb32b Branch: refs/heads/master Commit: 968eb32b8a77d5613f7645c0f04ce194588e Parents: 6fc47ed Author: Thomas WeiseAuthored: Thu Oct 27 20:21:52 2016 -0700 Committer: Thomas Weise Committed: Thu Oct 27 20:21:52 2016 -0700 -- .../main/java/org/apache/beam/runners/apex/ApexRunnerResult.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968eb32b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java -- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java index d5613fe..4e3a8d2 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java @@ -62,12 +62,12 @@ public class ApexRunnerResult implements PipelineResult { } @Override - public State waitUntilFinish(Duration duration) throws IOException, InterruptedException { + public State waitUntilFinish(Duration duration) { throw new UnsupportedOperationException(); } @Override - public State waitUntilFinish() throws IOException, InterruptedException { + public State waitUntilFinish() { throw new UnsupportedOperationException(); }
[16/39] incubator-beam git commit: Closes #1139
Closes #1139 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/989e3998 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/989e3998 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/989e3998 Branch: refs/heads/master Commit: 989e399874a5f0ebcf7c19f24a7fd18cead7bfba Parents: 0a1b278 7105d92 Author: Thomas WeiseAuthored: Tue Oct 25 11:01:15 2016 -0700 Committer: Thomas Weise Committed: Tue Oct 25 11:01:15 2016 -0700 -- .../translators/ParDoBoundMultiTranslator.java | 14 ++- .../functions/ApexFlattenOperator.java | 3 +- .../translators/ParDoBoundTranslatorTest.java | 96 +++- 3 files changed, 107 insertions(+), 6 deletions(-) --
[3/4] incubator-beam git commit: Remove DoFnSignatures.INSTANCE
Remove DoFnSignatures.INSTANCE Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/efad9d47 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/efad9d47 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/efad9d47 Branch: refs/heads/master Commit: efad9d47f34694dc730f2d9e663cb61cba307679 Parents: f7745dc Author: Kenneth KnowlesAuthored: Mon Nov 7 22:36:09 2016 -0800 Committer: Kenneth Knowles Committed: Thu Nov 10 14:18:07 2016 -0800 -- .../beam/runners/core/SimpleDoFnRunner.java | 2 +- .../beam/runners/core/SplittableParDo.java | 2 +- .../runners/direct/ParDoOverrideFactory.java| 2 +- .../beam/sdk/transforms/DoFnAdapters.java | 2 +- .../org/apache/beam/sdk/transforms/ParDo.java | 6 +- .../sdk/transforms/reflect/DoFnInvokers.java| 2 +- .../sdk/transforms/reflect/DoFnSignatures.java | 11 ++-- .../sdk/transforms/reflect/OnTimerInvokers.java | 2 +- .../DoFnSignaturesProcessElementTest.java | 12 ++-- .../DoFnSignaturesSplittableDoFnTest.java | 34 +- .../transforms/reflect/DoFnSignaturesTest.java | 66 ++-- 11 files changed, 70 insertions(+), 71 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/efad9d47/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 3abb06b..2c5a850 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -96,7 +96,7 @@ public class SimpleDoFnRunner implements DoFnRunner fn) { checkNotNull(fn, "fn must not be null"); this.fn = fn; -this.signature = DoFnSignatures.INSTANCE.getSignature(fn.getClass()); +this.signature = DoFnSignatures.getSignature(fn.getClass()); checkArgument(signature.processElement().isSplittable(), "fn must be a splittable DoFn"); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/efad9d47/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java index 6052a41..27941f8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java @@ -44,7 +44,7 @@ class ParDoOverrideFactory // This is an OldDoFn, hence not splittable. return transform; } -DoFnSignature signature = DoFnSignatures.INSTANCE.getSignature(fn.getClass()); +DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); if (!signature.processElement().isSplittable()) { return transform; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/efad9d47/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
[1/4] incubator-beam git commit: Remove OnTimerInvokers.INSTANCE; deprecate DoFnInvokers.INSTANCE
Repository: incubator-beam Updated Branches: refs/heads/master f7745dc29 -> 3f8db06bd http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index 455e49b..64454e4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -17,74 +17,34 @@ */ package org.apache.beam.sdk.transforms.reflect; -import static com.google.common.base.Preconditions.checkArgument; - +import java.io.Serializable; import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; -import javax.annotation.Nullable; -import net.bytebuddy.ByteBuddy; -import net.bytebuddy.NamingStrategy; -import net.bytebuddy.description.field.FieldDescription; -import net.bytebuddy.description.method.MethodDescription; -import net.bytebuddy.description.modifier.FieldManifestation; -import net.bytebuddy.description.modifier.Visibility; -import net.bytebuddy.description.type.TypeDescription; -import net.bytebuddy.description.type.TypeList; -import net.bytebuddy.dynamic.DynamicType; -import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; -import net.bytebuddy.dynamic.scaffold.InstrumentedType; -import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy; -import net.bytebuddy.implementation.ExceptionMethod; -import net.bytebuddy.implementation.FixedValue; -import net.bytebuddy.implementation.Implementation; -import net.bytebuddy.implementation.Implementation.Context; -import net.bytebuddy.implementation.MethodDelegation; -import net.bytebuddy.implementation.bind.annotation.TargetMethodAnnotationDrivenBinder; -import net.bytebuddy.implementation.bytecode.ByteCodeAppender; -import net.bytebuddy.implementation.bytecode.StackManipulation; -import net.bytebuddy.implementation.bytecode.Throw; -import net.bytebuddy.implementation.bytecode.assign.Assigner; -import net.bytebuddy.implementation.bytecode.assign.TypeCasting; -import net.bytebuddy.implementation.bytecode.constant.TextConstant; -import net.bytebuddy.implementation.bytecode.member.FieldAccess; -import net.bytebuddy.implementation.bytecode.member.MethodInvocation; -import net.bytebuddy.implementation.bytecode.member.MethodReturn; -import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess; -import net.bytebuddy.jar.asm.Label; -import net.bytebuddy.jar.asm.MethodVisitor; -import net.bytebuddy.jar.asm.Opcodes; -import net.bytebuddy.jar.asm.Type; -import net.bytebuddy.matcher.ElementMatchers; -import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; -import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.values.TypeDescriptor; -/** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link DoFn}. */ +/** Static utilities for working with {@link DoFnInvoker}. */ public class DoFnInvokers { - public static final DoFnInvokers INSTANCE = new DoFnInvokers(); - private static final String FN_DELEGATE_FIELD_NAME = "delegate"; + /** + * Returns an {@link DoFnInvoker} for the given {@link DoFn}, using a default choice of {@link + * DoFnInvokerFactory}. + * + * The default is permitted to change at any time. Users of this method may not depend on any + * details {@link DoFnInvokerFactory}-specific details of the invoker. Today it is {@link + *
[2/4] incubator-beam git commit: Remove OnTimerInvokers.INSTANCE; deprecate DoFnInvokers.INSTANCE
Remove OnTimerInvokers.INSTANCE; deprecate DoFnInvokers.INSTANCE Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/14a71e43 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/14a71e43 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/14a71e43 Branch: refs/heads/master Commit: 14a71e435acd9435ce02afe774df3adebd7355f0 Parents: efad9d4 Author: Kenneth KnowlesAuthored: Mon Nov 7 23:03:46 2016 -0800 Committer: Kenneth Knowles Committed: Thu Nov 10 14:18:07 2016 -0800 -- .../beam/runners/core/SimpleDoFnRunner.java | 2 +- .../beam/runners/core/SplittableParDo.java | 10 +- .../runners/direct/DoFnLifecycleManager.java| 4 +- .../beam/sdk/transforms/DoFnAdapters.java | 4 +- .../reflect/ByteBuddyDoFnInvokerFactory.java| 828 +++ .../reflect/ByteBuddyOnTimerInvokerFactory.java | 279 +++ .../transforms/reflect/DoFnInvokerFactory.java | 27 + .../sdk/transforms/reflect/DoFnInvokers.java| 711 +--- .../reflect/OnTimerInvokerFactory.java | 36 + .../sdk/transforms/reflect/OnTimerInvokers.java | 243 +- .../transforms/reflect/DoFnInvokersTest.java| 24 +- .../transforms/reflect/OnTimerInvokersTest.java | 2 +- .../transforms/DoFnInvokersBenchmark.java | 2 +- 13 files changed, 1227 insertions(+), 945 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 2c5a850..3b784d1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -97,7 +97,7 @@ public class SimpleDoFnRunner implements DoFnRunner > splitCoder = ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder); @@ -166,7 +166,7 @@ public class SplittableParDo< @Setup public void setup() { - invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + invoker = DoFnInvokers.invokerFor(fn); } @ProcessElement @@ -246,7 +246,7 @@ public class SplittableParDo< @Override public void setup() throws Exception { - invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + invoker = DoFnInvokers.invokerFor(fn); } @Override @@ -460,7 +460,7 @@ public class SplittableParDo< @Setup public void setup() { - invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(splittableFn); + invoker = DoFnInvokers.invokerFor(splittableFn); } @ProcessElement http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14a71e43/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
[2/4] incubator-beam git commit: Update examples archetype to match examples
Update examples archetype to match examples Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1e132ee8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1e132ee8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1e132ee8 Branch: refs/heads/master Commit: 1e132ee83d5f393498c12003a328e51d0e93bd06 Parents: 9f78c44 Author: Kenneth KnowlesAuthored: Thu Nov 10 12:06:42 2016 -0800 Committer: Kenneth Knowles Committed: Thu Nov 10 14:06:39 2016 -0800 -- .../src/main/java/DebuggingWordCount.java | 69 +--- .../src/main/java/MinimalWordCount.java | 52 +++ .../src/main/java/WindowedWordCount.java| 6 +- .../src/main/java/WordCount.java| 64 +++--- .../common/ExampleBigQueryTableOptions.java | 2 +- .../src/main/java/common/ExampleOptions.java| 5 ++ ...xamplePubsubTopicAndSubscriptionOptions.java | 2 +- .../java/common/ExamplePubsubTopicOptions.java | 2 +- .../src/main/java/common/ExampleUtils.java | 3 +- .../src/test/java/DebuggingWordCountTest.java | 15 - .../src/test/java/WordCountTest.java| 7 +- 11 files changed, 91 insertions(+), 136 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java -- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java index 9727379..99ae796 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java @@ -50,10 +50,9 @@ import org.slf4j.LoggerFactory; * * New Concepts: * - * 1. Logging to Cloud Logging - * 2. Controlling worker log levels - * 3. Creating a custom aggregator - * 4. Testing your Pipeline via PAssert + * 1. Logging using SLF4J, even in a distributed environment + * 2. Creating a custom aggregator (runners have varying levels of support) + * 3. Testing your Pipeline via PAssert * * * To execute this pipeline locally, specify general pipeline configuration: @@ -68,51 +67,20 @@ import org.slf4j.LoggerFactory; * } * * - * To use the additional logging discussed below, specify: - * {@code - * --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"} - * } - * - * - * Note that when you run via mvn exec, you may need to escape - * the quotations as appropriate for your shell. For example, in bash: - * - * mvn compile exec:java ... \ - * -Dexec.args="... \ - * --workerLogLevelOverrides={\\\"org.apache.beam.examples\\\":\\\"DEBUG\\\"}" - * + * The input file defaults to a public data set containing the text of of King Lear, + * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}. * - * Concept #2: Dataflow workers which execute user code are configured to log to Cloud - * Logging by default at "INFO" log level and higher. One may override log levels for specific - * logging namespaces by specifying: - * - * --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...} - * - * For example, by specifying: - * - * --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"} - * - * when executing this pipeline using the Dataflow service, Cloud Logging would contain only - * "DEBUG" or higher level logs for the {@code org.apache.beam.examples} package in - * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker - * logging configuration can be overridden by specifying - * {@code --defaultWorkerLogLevel=}. For example, - * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with - * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note - * that changing the default worker log level to TRACE or DEBUG will significantly increase - * the amount of logs output. - * - * The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt} - * and can be overridden with {@code --inputFile}. */ public class DebuggingWordCount { /** A DoFn that filters for a specific key based upon a regular expression. */ public static class FilterTextFn extends DoFn , KV > { /** - * Concept #1: The logger below uses the fully qualified
[4/4] incubator-beam git commit: This closes #1338
This closes #1338 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/503b4071 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/503b4071 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/503b4071 Branch: refs/heads/master Commit: 503b40717ee75e7ef7ed687f7ac797dfa4543db1 Parents: e43a383 cd5eca8 Author: Kenneth KnowlesAuthored: Thu Nov 10 14:06:40 2016 -0800 Committer: Kenneth Knowles Committed: Thu Nov 10 14:06:40 2016 -0800 -- .../src/main/java/DebuggingWordCount.java | 69 +--- .../src/main/java/MinimalWordCount.java | 52 +++ .../src/main/java/WindowedWordCount.java| 6 +- .../src/main/java/WordCount.java| 64 +++--- .../common/ExampleBigQueryTableOptions.java | 2 +- .../src/main/java/common/ExampleOptions.java| 5 ++ ...xamplePubsubTopicAndSubscriptionOptions.java | 2 +- .../java/common/ExamplePubsubTopicOptions.java | 2 +- .../src/main/java/common/ExampleUtils.java | 3 +- .../src/test/java/DebuggingWordCountTest.java | 15 - .../src/test/java/WordCountTest.java| 7 +- .../update-examples-archetype.sh| 59 + 12 files changed, 150 insertions(+), 136 deletions(-) --
[1/4] incubator-beam git commit: fixup! Add script
Repository: incubator-beam Updated Branches: refs/heads/master e43a38355 -> 503b40717 fixup! Add script Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cd5eca8d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cd5eca8d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cd5eca8d Branch: refs/heads/master Commit: cd5eca8df1d38a1c8ab2a44581bc0272b20bf609 Parents: 1e132ee Author: Kenneth KnowlesAuthored: Thu Nov 10 12:53:15 2016 -0800 Committer: Kenneth Knowles Committed: Thu Nov 10 14:06:39 2016 -0800 -- .../update-examples-archetype.sh| 26 +++- 1 file changed, 25 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd5eca8d/sdks/java/maven-archetypes/update-examples-archetype.sh -- diff --git a/sdks/java/maven-archetypes/update-examples-archetype.sh b/sdks/java/maven-archetypes/update-examples-archetype.sh index e61c916..3c222be 100755 --- a/sdks/java/maven-archetypes/update-examples-archetype.sh +++ b/sdks/java/maven-archetypes/update-examples-archetype.sh @@ -1,8 +1,32 @@ -#!/bin/bash -ex +#!/bin/bash -e +# +#Licensed to the Apache Software Foundation (ASF) under one or more +#contributor license agreements. See the NOTICE file distributed with +#this work for additional information regarding copyright ownership. +#The ASF licenses this file to You under the Apache License, Version 2.0 +#(the "License"); you may not use this file except in compliance with +#the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. +# +# Updates the examples archetype to match the examples module +# +# Usage: Invoke with no arguments from any working directory. + +# The directory of this script. Assumes root of the maven-archetypes module. HERE="$(dirname $0)" +# The directory of the examples-java module EXAMPLES_ROOT="${HERE}/../../../examples/java" + +# The root of the examples archetype ARCHETYPE_ROOT="${HERE}/examples/src/main/resources/archetype-resources" #
[3/4] incubator-beam git commit: Add script to update examples archetype automatically
Add script to update examples archetype automatically Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f78c443 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f78c443 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f78c443 Branch: refs/heads/master Commit: 9f78c443d70cb4e01d0b6b9d0cb64902d70c9b87 Parents: e43a383 Author: Kenneth KnowlesAuthored: Thu Nov 10 11:46:24 2016 -0800 Committer: Kenneth Knowles Committed: Thu Nov 10 14:06:39 2016 -0800 -- .../update-examples-archetype.sh| 35 1 file changed, 35 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f78c443/sdks/java/maven-archetypes/update-examples-archetype.sh -- diff --git a/sdks/java/maven-archetypes/update-examples-archetype.sh b/sdks/java/maven-archetypes/update-examples-archetype.sh new file mode 100755 index 000..e61c916 --- /dev/null +++ b/sdks/java/maven-archetypes/update-examples-archetype.sh @@ -0,0 +1,35 @@ +#!/bin/bash -ex + +HERE="$(dirname $0)" + +EXAMPLES_ROOT="${HERE}/../../../examples/java" +ARCHETYPE_ROOT="${HERE}/examples/src/main/resources/archetype-resources" + +# +# Copy the Java subset of the examples project verbatim. +# +rsync -a --exclude cookbook --exclude complete \ +"${EXAMPLES_ROOT}"/src/main/java/org/apache/beam/examples/ \ +"${ARCHETYPE_ROOT}/src/main/java" + +rsync -a --exclude cookbook --exclude complete --exclude '*IT.java' \ +"${EXAMPLES_ROOT}"/src/test/java/org/apache/beam/examples/\ +"${ARCHETYPE_ROOT}/src/test/java" + +# +# Replace 'package org.apache.beam.examples' with 'package ${package}' in all Java code +# +find "${ARCHETYPE_ROOT}/src/main/java" -name '*.java' -print0 \ +| xargs -0 sed -i 's/^package org\.apache\.beam\.examples/package ${package}/g' + +find "${ARCHETYPE_ROOT}/src/test/java" -name '*.java' -print0 \ +| xargs -0 sed -i 's/^package org\.apache\.beam\.examples/package ${package}/g' + +# +# Replace 'import org.apache.beam.examples.' with 'import ${package}.' in all Java code +# +find "${ARCHETYPE_ROOT}/src/main/java" -name '*.java' -print0 \ +| xargs -0 sed -i 's/^import org\.apache\.beam\.examples/import ${package}/g' + +find "${ARCHETYPE_ROOT}/src/test/java" -name '*.java' -print0 \ +| xargs -0 sed -i 's/^import org\.apache\.beam\.examples/import ${package}/g'
[1/6] incubator-beam git commit: This closes #1320
Repository: incubator-beam Updated Branches: refs/heads/master ab06647f9 -> 11eaed19a This closes #1320 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/11eaed19 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/11eaed19 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/11eaed19 Branch: refs/heads/master Commit: 11eaed19a471ecc0c3fa1a57fb14c93ab21437a7 Parents: ab06647 bf39dc6 Author: Kenneth KnowlesAuthored: Thu Nov 10 10:49:28 2016 -0800 Committer: Kenneth Knowles Committed: Thu Nov 10 10:49:28 2016 -0800 -- .../beam/examples/WindowedWordCountIT.java | 14 -- .../dataflow/testing/TestDataflowRunner.java| 20 ++-- .../beam/sdk/testing/TestPipelineOptions.java | 6 ++ 3 files changed, 32 insertions(+), 8 deletions(-) --
[2/6] incubator-beam git commit: Make test timeout configurable and use in TestDataflowRunner
Make test timeout configurable and use in TestDataflowRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e82e35b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e82e35b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e82e35b Branch: refs/heads/master Commit: 7e82e35b2476db6b2b491861e5c2c52042ce2161 Parents: 5653b86 Author: Kenneth KnowlesAuthored: Wed Nov 9 19:24:44 2016 -0800 Committer: Kenneth Knowles Committed: Thu Nov 10 10:49:28 2016 -0800 -- .../beam/runners/dataflow/testing/TestDataflowRunner.java | 10 +++--- .../org/apache/beam/sdk/testing/TestPipelineOptions.java | 6 ++ 2 files changed, 13 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e82e35b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index 10c72b7..70c3f58 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -135,10 +135,14 @@ public class TestDataflowRunner extends PipelineRunner { } } }); -State finalState = job.waitUntilFinish(Duration.standardMinutes(10L), messageHandler); +State finalState = +job.waitUntilFinish( +Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler); if (finalState == null || finalState == State.RUNNING) { - LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.", - job.getJobId()); + LOG.info( + "Dataflow job {} took longer than {} seconds to complete, cancelling.", + job.getJobId(), + options.getTestTimeoutSeconds()); job.cancel(); } success = resultFuture.get(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e82e35b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java index ff553ba..0739381 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.testing; +import javax.annotation.Nullable; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -41,6 +42,11 @@ public interface TestPipelineOptions extends PipelineOptions { SerializableMatcher getOnSuccessMatcher(); void setOnSuccessMatcher(SerializableMatcher value); + @Default.Long(10 * 60) + @Nullable + Long getTestTimeoutSeconds(); + void setTestTimeoutSeconds(Long value); + /** * Factory for {@link PipelineResult} matchers which always pass. */
[6/6] incubator-beam git commit: Make TestDataflowRunner crash message actionable
Make TestDataflowRunner crash message actionable Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5653b860 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5653b860 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5653b860 Branch: refs/heads/master Commit: 5653b860dbe64f61707c093fff5bf14061e772d0 Parents: 7344914 Author: Kenneth KnowlesAuthored: Wed Nov 9 14:36:54 2016 -0800 Committer: Kenneth Knowles Committed: Thu Nov 10 10:49:28 2016 -0800 -- .../apache/beam/runners/dataflow/testing/TestDataflowRunner.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5653b860/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index 9dacfd3..10c72b7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -152,7 +152,9 @@ public class TestDataflowRunner extends PipelineRunner { } else if (!success.get()) { throw new AssertionError( Strings.isNullOrEmpty(messageHandler.getErrorMessage()) -? "The dataflow did not return a failure reason." +? String.format( +"Dataflow job %s terminated in state %s but did not return a failure reason.", +job.getJobId(), job.getState()) : messageHandler.getErrorMessage()); } else { assertThat(job, testPipelineOptions.getOnSuccessMatcher());
[4/6] incubator-beam git commit: Fix null or empty check in TestDataflowRunner
Fix null or empty check in TestDataflowRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/73449146 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/73449146 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/73449146 Branch: refs/heads/master Commit: 7344914638953a78251d021f4001e404e8c1aff1 Parents: e5b2f13 Author: Kenneth KnowlesAuthored: Wed Nov 9 13:39:57 2016 -0800 Committer: Kenneth Knowles Committed: Thu Nov 10 10:49:28 2016 -0800 -- .../beam/runners/dataflow/testing/TestDataflowRunner.java| 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/73449146/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index 0f141d2..9dacfd3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -25,6 +25,7 @@ import com.google.api.services.dataflow.model.MetricUpdate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Optional; +import com.google.common.base.Strings; import com.google.common.base.Throwables; import java.io.IOException; import java.math.BigDecimal; @@ -149,9 +150,10 @@ public class TestDataflowRunner extends PipelineRunner { throw new IllegalStateException( "The dataflow did not output a success or failure metric."); } else if (!success.get()) { -throw new AssertionError(messageHandler.getErrorMessage() == null -? "The dataflow did not return a failure reason." -: messageHandler.getErrorMessage()); +throw new AssertionError( +Strings.isNullOrEmpty(messageHandler.getErrorMessage()) +? "The dataflow did not return a failure reason." +: messageHandler.getErrorMessage()); } else { assertThat(job, testPipelineOptions.getOnSuccessMatcher()); }
[3/6] incubator-beam git commit: Fix breakage in WindowedWordCountIT
Fix breakage in WindowedWordCountIT Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e5b2f13f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e5b2f13f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e5b2f13f Branch: refs/heads/master Commit: e5b2f13f0116b0cd954b2214ca33cbf5a46a7261 Parents: ab06647 Author: Kenneth KnowlesAuthored: Wed Nov 9 10:22:51 2016 -0800 Committer: Kenneth Knowles Committed: Thu Nov 10 10:49:28 2016 -0800 -- .../org/apache/beam/examples/WindowedWordCountIT.java | 13 +++-- 1 file changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5b2f13f/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index c78fad6..ccc2d5e 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -18,13 +18,14 @@ package org.apache.beam.examples; import java.io.IOException; -import org.apache.beam.examples.WindowedWordCount.Options; +import java.util.Date; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.BigqueryMatcher; import org.apache.beam.sdk.testing.StreamingIT; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.util.IOChannelUtils; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -43,7 +44,7 @@ public class WindowedWordCountIT { * Options for the {@link WindowedWordCount} Integration Test. */ public interface WindowedWordCountITOptions - extends Options, TestPipelineOptions, StreamingOptions { + extends WindowedWordCount.Options, TestPipelineOptions, StreamingOptions { } @BeforeClass @@ -67,6 +68,14 @@ public class WindowedWordCountIT { TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class); options.setStreaming(isStreaming); +// Note: currently unused because the example writes to BigQuery, but WindowedWordCount.Options +// are tightly coupled to WordCount.Options, where the option is required. +options.setOutput(IOChannelUtils.resolve( +options.getTempRoot(), +String.format("WindowedWordCountIT-%tF-%
[1/2] incubator-beam git commit: This closes #1318
Repository: incubator-beam Updated Branches: refs/heads/master f802919c2 -> ef750c0f8 This closes #1318 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef750c0f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef750c0f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef750c0f Branch: refs/heads/master Commit: ef750c0f89b6319b66a2ae629124b3005a6aed6f Parents: f802919 047911d Author: Kenneth KnowlesAuthored: Wed Nov 9 14:47:23 2016 -0800 Committer: Kenneth Knowles Committed: Wed Nov 9 14:47:23 2016 -0800 -- runners/core-java/pom.xml | 12 .../apache/beam/runners/core/SplittableParDo.java | 1 - .../core/UnboundedReadFromBoundedSource.java| 3 ++- .../runners/core/triggers/TriggerStateMachines.java | 16 +++- 4 files changed, 17 insertions(+), 15 deletions(-) --
[2/2] incubator-beam git commit: Fix findbugs errors and re-enable for runners-core
Fix findbugs errors and re-enable for runners-core Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/047911d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/047911d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/047911d3 Branch: refs/heads/master Commit: 047911d392dc2def547c318aecaf716abe3199b8 Parents: f802919 Author: Kenneth KnowlesAuthored: Tue Nov 8 14:49:09 2016 -0800 Committer: Kenneth Knowles Committed: Wed Nov 9 14:47:23 2016 -0800 -- runners/core-java/pom.xml | 12 .../apache/beam/runners/core/SplittableParDo.java | 1 - .../core/UnboundedReadFromBoundedSource.java| 3 ++- .../runners/core/triggers/TriggerStateMachines.java | 16 +++- 4 files changed, 17 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047911d3/runners/core-java/pom.xml -- diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index aa5f145..40ebf58 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -33,18 +33,6 @@ jar - - - - - org.codehaus.mojo - findbugs-maven-plugin - -true - - - - org.apache.maven.plugins http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047911d3/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index d8ee1d5..cea75b3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -241,7 +241,6 @@ public class SplittableParDo< this.windowCoder = windowCoder; elementTag = StateTags.value("element", WindowedValue.getFullCoder(elementCoder, this.windowCoder)); - DoFnInvoker invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); restrictionTag = StateTags.value("restriction", restrictionCoder); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047911d3/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index 2afdcf2..29dc57e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -469,7 +469,8 @@ public class UnboundedReadFromBoundedSource extends PTransform
[4/4] incubator-beam git commit: This closes #1315
This closes #1315 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/084a5e8a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/084a5e8a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/084a5e8a Branch: refs/heads/master Commit: 084a5e8aedfdd53f3d76ad48b55a66a282881646 Parents: e35f571 9bc9c3f Author: Kenneth KnowlesAuthored: Tue Nov 8 13:52:23 2016 -0800 Committer: Kenneth Knowles Committed: Tue Nov 8 13:52:23 2016 -0800 -- .../beam/examples/DebuggingWordCount.java | 67 +--- .../apache/beam/examples/MinimalWordCount.java | 50 +++ .../org/apache/beam/examples/WordCount.java | 62 +++--- .../beam/examples/DebuggingWordCountTest.java | 15 - 4 files changed, 73 insertions(+), 121 deletions(-) --
[2/4] incubator-beam git commit: Revise WordCount example to be better cross-runner example
Revise WordCount example to be better cross-runner example Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0b05a8c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0b05a8c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0b05a8c7 Branch: refs/heads/master Commit: 0b05a8c7ff8e1f76516a6b13d504f776b5c9111e Parents: c64cf36 Author: Kenneth KnowlesAuthored: Thu Nov 3 14:19:47 2016 -0700 Committer: Kenneth Knowles Committed: Tue Nov 8 13:51:25 2016 -0800 -- .../org/apache/beam/examples/WordCount.java | 62 1 file changed, 23 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0b05a8c7/examples/java/src/main/java/org/apache/beam/examples/WordCount.java -- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java index e7eab6e..5be0ddc 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java @@ -17,15 +17,13 @@ */ package org.apache.beam.examples; -import com.google.common.base.Strings; -import java.io.IOException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; @@ -34,8 +32,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -53,7 +49,7 @@ import org.apache.beam.sdk.values.PCollection; * * * Basic concepts, also in the MinimalWordCount example: - * Reading text files; counting a PCollection; writing to GCS. + * Reading text files; counting a PCollection; writing to text files * * New Concepts: * @@ -63,30 +59,31 @@ import org.apache.beam.sdk.values.PCollection; * 4. Defining your own pipeline options * * - * Concept #1: you can execute this pipeline either locally or using the selected runner. + * Concept #1: you can execute this pipeline either locally or using by selecting another runner. * These are now command-line options and not hard-coded as they were in the MinimalWordCount * example. - * To execute this pipeline locally, specify a local output file or output prefix on GCS: - * {@code - * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] - * } * * To change the runner, specify: * {@code * --runner=YOUR_SELECTED_RUNNER * } * - * See examples/java/README.md for instructions about how to configure different runners. * - * The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt} - * and can be overridden with {@code --inputFile}. + * To execute this pipeline, specify a local output file (if using the + * {@code DirectRunner}) or output prefix on a supported distributed file system. + * {@code + * --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX] + * } + * + * The input file defaults to a public data set containing the text of of King Lear, + * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}. */ public class WordCount { /** - * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out- - * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the - * pipeline. + * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns + * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it + * to a ParDo in the pipeline. */ static class ExtractWordsFn extends DoFn { private final Aggregator emptyLines = @@ -153,36 +150,23 @@ public class WordCount { * Inherits standard configuration options. */ public interface WordCountOptions extends PipelineOptions { + +/** + * By default, this example reads from a public dataset containing
[1/4] incubator-beam git commit: Hardcode MinimalWordCount to the DirectRunner
Repository: incubator-beam Updated Branches: refs/heads/master e35f571b0 -> 084a5e8ae Hardcode MinimalWordCount to the DirectRunner This makes it easy to immediately run, and removes various non-portable instructions and others that aren't the easiest for a "Getting Started" scenario. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c64cf367 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c64cf367 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c64cf367 Branch: refs/heads/master Commit: c64cf367299b6fdbe25c62eec9840b02fbc9d518 Parents: e35f571 Author: Kenneth KnowlesAuthored: Thu Nov 3 14:18:43 2016 -0700 Committer: Kenneth Knowles Committed: Tue Nov 8 13:51:23 2016 -0800 -- .../apache/beam/examples/MinimalWordCount.java | 50 +--- 1 file changed, 22 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c64cf367/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java -- diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java index 6fc873e..6085539 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java @@ -37,46 +37,33 @@ import org.apache.beam.sdk.values.KV; * argument processing, and focus on construction of the pipeline, which chains together the * application of core transforms. * - * Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally - * the {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional + * Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the + * {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional * concepts. * * Concepts: + * * * 1. Reading data from text files * 2. Specifying 'inline' transforms - * 3. Counting a PCollection - * 4. Writing data to Cloud Storage as text files + * 3. Counting items in a PCollection + * 4. Writing data to text files * * - * To execute this pipeline, first edit the code to set your project ID, the temp - * location, and the output location. The specified GCS bucket(s) must already exist. - * - * Then, run the pipeline as described in the README. It will be deployed and run with the - * selected runner. No args are required to run the pipeline. You can see the results in your - * output bucket in the GCS browser. + * No arguments are required to run this pipeline. It will be executed with the DirectRunner. You + * can see the results in the output files in your current working directory, with names like + * "wordcounts-1-of-5. When running on a distributed service, you would use an appropriate + * file service. */ public class MinimalWordCount { public static void main(String[] args) { // Create a PipelineOptions object. This object lets us set various execution -// options for our pipeline, such as the associated Cloud Platform project and the location -// in Google Cloud Storage to stage files. +// options for our pipeline, such as the runner you wish to use. This example +// will run with the DirectRunner by default, based on the class path configured +// in its dependencies. PipelineOptions options = PipelineOptionsFactory.create(); -// In order to run your pipeline, you need to make following runner specific changes: -// -// CHANGE 1/3: Select a Beam runner, such as DataflowRunner or FlinkRunner. -// CHANGE 2/3: Specify runner-required options. -// For DataflowRunner, set project and temp location as follows: -// DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); -// dataflowOptions.setRunner(DataflowRunner.class); -// dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); -// dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); -// For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions} -// for more details. -// options.setRunner(FlinkRunner.class); - // Create the Pipeline object with the options we defined above. Pipeline p = Pipeline.create(options); @@ -85,7 +72,10 @@ public class MinimalWordCount { // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set // of input text files. TextIO.Read returns a PCollection where each element is
[3/4] incubator-beam git commit: Revise DebuggingWordCount to be more portable
Revise DebuggingWordCount to be more portable Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9bc9c3f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9bc9c3f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9bc9c3f0 Branch: refs/heads/master Commit: 9bc9c3f0fcab4571f60d4eb872df0904ee0eb99d Parents: 0b05a8c Author: Kenneth KnowlesAuthored: Thu Nov 3 14:50:02 2016 -0700 Committer: Kenneth Knowles Committed: Tue Nov 8 13:51:25 2016 -0800 -- .../beam/examples/DebuggingWordCount.java | 67 +--- .../beam/examples/DebuggingWordCountTest.java | 15 - 2 files changed, 28 insertions(+), 54 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bc9c3f0/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java -- diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java index 1d2c83a..f7c537c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java @@ -50,10 +50,9 @@ import org.slf4j.LoggerFactory; * * New Concepts: * - * 1. Logging to Cloud Logging - * 2. Controlling worker log levels - * 3. Creating a custom aggregator - * 4. Testing your Pipeline via PAssert + * 1. Logging using SLF4J, even in a distributed environment + * 2. Creating a custom aggregator (runners have varying levels of support) + * 3. Testing your Pipeline via PAssert * * * To execute this pipeline locally, specify general pipeline configuration: @@ -68,51 +67,20 @@ import org.slf4j.LoggerFactory; * } * * - * To use the additional logging discussed below, specify: - * {@code - * --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"} - * } - * - * - * Note that when you run via mvn exec, you may need to escape - * the quotations as appropriate for your shell. For example, in bash: - * - * mvn compile exec:java ... \ - * -Dexec.args="... \ - * --workerLogLevelOverrides={\\\"org.apache.beam.examples\\\":\\\"DEBUG\\\"}" - * + * The input file defaults to a public data set containing the text of of King Lear, + * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}. * - * Concept #2: Dataflow workers which execute user code are configured to log to Cloud - * Logging by default at "INFO" log level and higher. One may override log levels for specific - * logging namespaces by specifying: - * - * --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...} - * - * For example, by specifying: - * - * --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"} - * - * when executing this pipeline using the Dataflow service, Cloud Logging would contain only - * "DEBUG" or higher level logs for the {@code org.apache.beam.examples} package in - * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker - * logging configuration can be overridden by specifying - * {@code --defaultWorkerLogLevel=}. For example, - * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with - * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note - * that changing the default worker log level to TRACE or DEBUG will significantly increase - * the amount of logs output. - * - * The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt} - * and can be overridden with {@code --inputFile}. */ public class DebuggingWordCount { /** A DoFn that filters for a specific key based upon a regular expression. */ public static class FilterTextFn extends DoFn , KV > { /** - * Concept #1: The logger below uses the fully qualified class name of FilterTextFn - * as the logger. All log statements emitted by this logger will be referenced by this name - * and will be visible in the Cloud Logging UI. Learn more at https://cloud.google.com/logging - * about the Cloud Logging UI. + * Concept #1: The logger below uses the fully qualified class name of FilterTextFn as the + * logger. Depending on your SLF4J configuration, log statements will likely be qualified by + * this name. + * + * Note that this is entirely standard SLF4J usage. Some runners may provide a default SLF4J + * configuration that is most appropriate for their logging integration. */ private static final Logger LOG =
[3/3] incubator-beam git commit: Add Spark and Flink runner version to root pom
Add Spark and Flink runner version to root pom Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/32b93815 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/32b93815 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/32b93815 Branch: refs/heads/master Commit: 32b9381525bce5781a2d523175bdc34cd114299f Parents: 09f6aa6 Author: Kenneth KnowlesAuthored: Tue Nov 8 11:53:22 2016 -0800 Committer: Kenneth Knowles Committed: Tue Nov 8 13:39:06 2016 -0800 -- pom.xml | 12 1 file changed, 12 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32b93815/pom.xml -- diff --git a/pom.xml b/pom.xml index bd6037e..749ca9c 100644 --- a/pom.xml +++ b/pom.xml @@ -352,6 +352,18 @@ org.apache.beam +beam-runners-spark +${project.version} + + + +org.apache.beam +beam-runners-flink_2.10 +${project.version} + + + +org.apache.beam beam-examples-java ${project.version}
[1/3] incubator-beam git commit: This closes #1314
Repository: incubator-beam Updated Branches: refs/heads/master 09f6aa607 -> e35f571b0 This closes #1314 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e35f571b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e35f571b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e35f571b Branch: refs/heads/master Commit: e35f571b0df9e1b74013632c30743a1ddecd3e2a Parents: 09f6aa6 8079284 Author: Kenneth KnowlesAuthored: Tue Nov 8 13:39:06 2016 -0800 Committer: Kenneth Knowles Committed: Tue Nov 8 13:39:06 2016 -0800 -- examples/java/pom.xml | 62 ++ pom.xml | 12 + 2 files changed, 42 insertions(+), 32 deletions(-) --
[2/3] incubator-beam git commit: Add runner-specific profiles to the examples pom.xml
Add runner-specific profiles to the examples pom.xml Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/80792848 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/80792848 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/80792848 Branch: refs/heads/master Commit: 807928482706db1bd7904207487adc65c5f435b6 Parents: 32b9381 Author: Kenneth KnowlesAuthored: Thu Nov 3 16:17:18 2016 -0700 Committer: Kenneth Knowles Committed: Tue Nov 8 13:39:06 2016 -0800 -- examples/java/pom.xml | 62 ++ 1 file changed, 30 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/80792848/examples/java/pom.xml -- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 6d18a0f..f66fd36 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -39,22 +39,13 @@ + - include-runners + direct-runner true @@ -62,41 +53,43 @@ org.apache.beam beam-runners-direct-java - ${project.version} runtime - true + + + + + flink-runner + org.apache.beam beam-runners-flink_2.10 - ${project.version} runtime - true + + + + + spark-runner + org.apache.beam - beam-runners-google-cloud-dataflow-java - ${project.version} + beam-runners-spark runtime - true - - org.apache.beam - beam-runners-spark - ${project.version} + org.apache.spark + spark-streaming_2.10 + ${spark.version} runtime - true - org.apache.spark spark-core_2.10 ${spark.version} runtime - true org.slf4j @@ -104,16 +97,22 @@ + + + + + dataflow-runner + - org.apache.spark - spark-streaming_2.10 - ${spark.version} + org.apache.beam + beam-runners-google-cloud-dataflow-java runtime - true + + jenkins-precommit @@ -448,7 +447,6 @@ org.apache.beam beam-runners-direct-java - ${project.version} test
[49/50] incubator-beam git commit: [BEAM-79] update GearpumpPipelineResult
[BEAM-79] update GearpumpPipelineResult Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a96a17f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a96a17f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a96a17f Branch: refs/heads/gearpump-runner Commit: 2a96a17f2fe9a391ec5b2f0b4bea223530c1ba34 Parents: 0c36228 Author: manuzhangAuthored: Tue Nov 8 11:25:27 2016 +0800 Committer: manuzhang Committed: Tue Nov 8 11:30:26 2016 +0800 -- .../org/apache/beam/runners/gearpump/GearpumpPipelineResult.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a96a17f/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java -- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java index e7c621e..ed1201d 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java @@ -44,12 +44,12 @@ public class GearpumpPipelineResult implements PipelineResult { } @Override - public State waitUntilFinish(Duration duration) throws IOException, InterruptedException { + public State waitUntilFinish(Duration duration) { return null; } @Override - public State waitUntilFinish() throws IOException, InterruptedException { + public State waitUntilFinish() { return null; }
[19/50] incubator-beam git commit: Rename RegexTransform to just Regex
Rename RegexTransform to just Regex Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6954abe4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6954abe4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6954abe4 Branch: refs/heads/gearpump-runner Commit: 6954abe4e28103f5ddd3e1eebe998b765cd9de11 Parents: f6a9733 Author: Kenneth KnowlesAuthored: Mon Nov 7 10:10:32 2016 -0800 Committer: Kenneth Knowles Committed: Mon Nov 7 10:10:32 2016 -0800 -- .../org/apache/beam/sdk/transforms/Regex.java | 505 +++ .../beam/sdk/transforms/RegexTransform.java | 505 --- .../apache/beam/sdk/transforms/RegexTest.java | 262 ++ .../beam/sdk/transforms/RegexTransformTest.java | 262 -- 4 files changed, 767 insertions(+), 767 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6954abe4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java new file mode 100644 index 000..27104f6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@code PTransorm}s to use Regular Expressions to process elements in a + * {@link PCollection}. + * + * + * {@link Regex#matches(String, int)} can be used to see if an entire line matches + * a Regex. {@link Regex#matchesKV(String, int, int)} can be used to see if an entire + * line matches a Regex and output certain groups as a {@link KV}. + * + * + * {@link Regex#find(String, int)} can be used to see if a portion of a line + * matches a Regex. {@link Regex#matchesKV(String, int, int)} can be used to see if a + * portion of a line matches a Regex and output certain groups as a {@link KV}. + * + * + * Lines that do not match the Regex will not be output. + * + */ +public class Regex { + private Regex() { +// do not instantiate + } + + /** + * Returns a {@link Regex.Matches} {@link PTransform} that checks if + * the entire line matches the Regex. Returns the entire line (group 0) as a + * {@link PCollection}. + * @param regex + * The regular expression to run + */ + public static Matches matches(String regex) { +return matches(regex, 0); + } + + /** + * Returns a {@link Regex.Matches} {@link PTransform} that checks if + * the entire line matches the Regex. Returns the group as a + * {@link PCollection}. + * @param regex + * The regular expression to run + * @param group + * The Regex group to return as a PCollection + */ + public static Matches matches(String regex, int group) { +return new Matches(regex, group); + } + + /** + * Returns a {@link Regex.MatchesKV} {@link PTransform} that checks + * if the entire line matches the Regex. Returns the specified groups as the + * key and value as a {@link PCollection}. + * @param regex + * The regular expression to run + * @param keyGroup + * The Regex group to use as the key + * @param valueGroup + * The Regex group to use the value + */ + public static MatchesKV matchesKV(String regex, int keyGroup, + int valueGroup) { +return new MatchesKV(regex, keyGroup, valueGroup); + } + + /** + * Returns a {@link Regex.Find} {@link PTransform} that checks if a + * portion of the line matches the Regex. Returns the entire line (group 0) as + * a {@link PCollection}. + * @param regex + * The regular expression to run + */ +
[15/50] incubator-beam git commit: move source jar creation into release profile
move source jar creation into release profile Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/68f26386 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/68f26386 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/68f26386 Branch: refs/heads/gearpump-runner Commit: 68f26386e6ba7c5a883ba34aff4618234811 Parents: 5864a38 Author: Dan HalperinAuthored: Sat Nov 5 01:45:57 2016 -0700 Committer: Dan Halperin Committed: Mon Nov 7 08:45:50 2016 -0800 -- examples/java/pom.xml | 6 -- examples/java8/pom.xml | 6 -- pom.xml| 5 + runners/core-java/pom.xml | 6 -- runners/direct-java/pom.xml| 6 -- runners/flink/examples/pom.xml | 4 runners/flink/runner/pom.xml | 4 runners/google-cloud-dataflow-java/pom.xml | 6 -- runners/spark/pom.xml | 4 sdks/java/core/pom.xml | 6 -- sdks/java/extensions/join-library/pom.xml | 4 sdks/java/extensions/sorter/pom.xml| 4 sdks/java/io/google-cloud-platform/pom.xml | 4 sdks/java/io/hdfs/pom.xml | 4 sdks/java/io/jdbc/pom.xml | 4 sdks/java/io/jms/pom.xml | 4 sdks/java/io/kafka/pom.xml | 4 sdks/java/io/kinesis/pom.xml | 4 sdks/java/io/mongodb/pom.xml | 4 sdks/java/java8tests/pom.xml | 6 -- sdks/java/microbenchmarks/pom.xml | 5 - 21 files changed, 5 insertions(+), 95 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68f26386/examples/java/pom.xml -- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 857373a..5d69bfe 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -277,12 +277,6 @@ - - -org.apache.maven.plugins -maven-source-plugin - - org.apache.maven.plugins maven-shade-plugin http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68f26386/examples/java8/pom.xml -- diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml index 0a5dcb0..72f24ca 100644 --- a/examples/java8/pom.xml +++ b/examples/java8/pom.xml @@ -140,12 +140,6 @@ - - -org.apache.maven.plugins -maven-source-plugin - - org.apache.maven.plugins maven-jar-plugin http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68f26386/pom.xml -- diff --git a/pom.xml b/pom.xml index 77e87ba..c1bd5c8 100644 --- a/pom.xml +++ b/pom.xml @@ -220,6 +220,11 @@ org.apache.maven.plugins maven-javadoc-plugin + + +org.apache.maven.plugins +maven-source-plugin + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68f26386/runners/core-java/pom.xml -- diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index 3d8a84b..40ebf58 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -57,12 +57,6 @@ maven-jar-plugin - - -org.apache.maven.plugins -maven-source-plugin - - org.apache.maven.plugins maven-shade-plugin http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68f26386/runners/direct-java/pom.xml -- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index 64e9b5a..8983b1c 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -51,12 +51,6 @@ maven-jar-plugin - - -org.apache.maven.plugins -maven-source-plugin - - org.apache.maven.plugins maven-surefire-plugin http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68f26386/runners/flink/examples/pom.xml -- diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml index 051ec2d..dccb5cb 100644 --- a/runners/flink/examples/pom.xml +++ b/runners/flink/examples/pom.xml @@ -90,10 +90,6 @@ org.apache.maven.plugins maven-jar-plugin - -org.apache.maven.plugins -
[12/50] incubator-beam git commit: Enable javadoc for all modules by default
Enable javadoc for all modules by default Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aae65db6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aae65db6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aae65db6 Branch: refs/heads/gearpump-runner Commit: aae65db663c27de5b9a21adedc2110cc21cd4061 Parents: 14e093a Author: Dan HalperinAuthored: Fri Nov 4 15:28:43 2016 -0700 Committer: Dan Halperin Committed: Fri Nov 4 18:29:21 2016 -0700 -- examples/java/pom.xml | 68 examples/java8/pom.xml | 8 --- pom.xml| 11 +++- runners/core-java/pom.xml | 23 runners/direct-java/pom.xml| 61 - runners/flink/examples/pom.xml | 8 --- runners/flink/runner/pom.xml | 8 --- runners/google-cloud-dataflow-java/pom.xml | 70 - runners/spark/pom.xml | 4 -- sdks/java/core/pom.xml | 70 - sdks/java/io/jdbc/pom.xml | 4 -- sdks/java/io/kafka/pom.xml | 7 --- sdks/java/io/kinesis/pom.xml | 7 --- sdks/java/io/mongodb/pom.xml | 4 -- 14 files changed, 10 insertions(+), 343 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aae65db6/examples/java/pom.xml -- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 12a114f..857373a 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -285,74 +285,6 @@ org.apache.maven.plugins -maven-javadoc-plugin - - Apache Beam Examples - Apache Beam Examples - - org.apache.beam.examples - -exclude org.apache.beam.sdk.runners.worker:org.apache.beam.sdk.runners.dataflow:org.apache.beam.sdk.util ${beam.javadoc_opts} - false - true - - - - - - https://cloud.google.com/dataflow/java-sdk/JavaDoc/ - ${basedir}/../../sdks/java/javadoc/dataflow-sdk-docs - - - - https://developers.google.com/api-client-library/java/google-api-java-client/reference/1.20.0/ - ${basedir}/../../sdks/java/javadoc/apiclient-docs - - - http://avro.apache.org/docs/${avro.version}/api/java/ - ${basedir}/../../sdks/java/javadoc/avro-docs - - - https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/ - ${basedir}/../../sdks/java/javadoc/bq-docs - - - https://cloud.google.com/datastore/docs/apis/javadoc/ - ${basedir}/../../sdks/java/javadoc/datastore-docs - - - http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/ - ${basedir}/../../sdks/java/javadoc/guava-docs - - - http://fasterxml.github.io/jackson-annotations/javadoc/2.7/ - ${basedir}/../../sdks/java/javadoc/jackson-annotations-docs - - - http://fasterxml.github.io/jackson-databind/javadoc/2.7/ - ${basedir}/../../sdks/java/javadoc/jackson-databind-docs - - - http://www.joda.org/joda-time/apidocs - ${basedir}/../../sdks/java/javadoc/joda-docs - - - https://developers.google.com/api-client-library/java/google-oauth-java-client/reference/1.20.0/ - ${basedir}/../../sdks/java/javadoc/oauth-docs - - - - - - - jar - -package - - - - - -org.apache.maven.plugins maven-shade-plugin http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aae65db6/examples/java8/pom.xml -- diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml index 18257d9..0a5dcb0 100644 --- a/examples/java8/pom.xml +++ b/examples/java8/pom.xml @@ -120,14 +120,6 @@ -org.apache.maven.plugins -maven-javadoc-plugin - - -Xdoclint:missing - - - - maven-compiler-plugin 1.8 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aae65db6/pom.xml
[48/50] incubator-beam git commit: Merge branch 'master' into gearpump-runner
Merge branch 'master' into gearpump-runner # Conflicts: # runners/pom.xml Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0c362282 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0c362282 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0c362282 Branch: refs/heads/gearpump-runner Commit: 0c362282d3f2ca224c937c813f467723d7b1e908 Parents: 323ec11 99505e1 Author: manuzhangAuthored: Tue Nov 8 10:30:33 2016 +0800 Committer: manuzhang Committed: Tue Nov 8 10:30:33 2016 +0800 -- .travis.yml | 22 +- examples/java/pom.xml | 92 +-- .../beam/examples/DebuggingWordCount.java | 2 +- .../apache/beam/examples/MinimalWordCount.java | 2 +- .../org/apache/beam/examples/WordCount.java | 2 +- .../beam/examples/common/ExampleUtils.java | 3 +- .../apache/beam/examples/complete/TfIdf.java| 8 +- .../examples/complete/TopWikipediaSessions.java | 2 +- .../examples/cookbook/BigQueryTornadoes.java| 2 +- .../cookbook/CombinePerKeyExamples.java | 2 +- .../beam/examples/cookbook/DeDupExample.java| 10 +- .../beam/examples/cookbook/FilterExamples.java | 2 +- .../beam/examples/cookbook/JoinExamples.java| 2 +- .../examples/cookbook/MaxPerKeyExamples.java| 2 +- .../org/apache/beam/examples/cookbook/README.md | 2 +- .../beam/examples/WindowedWordCountIT.java | 15 +- .../org/apache/beam/examples/WordCountIT.java | 18 +- .../org/apache/beam/examples/WordCountTest.java | 2 +- .../examples/complete/AutoCompleteTest.java | 6 +- .../beam/examples/complete/TfIdfTest.java | 6 +- .../complete/TopWikipediaSessionsTest.java | 2 +- .../examples/cookbook/BigQueryTornadoesIT.java | 15 +- .../examples/cookbook/DeDupExampleTest.java | 82 -- .../examples/cookbook/DistinctExampleTest.java | 82 ++ .../examples/cookbook/JoinExamplesTest.java | 2 +- .../examples/cookbook/TriggerExampleTest.java | 2 +- examples/java8/pom.xml | 32 +- .../beam/examples/MinimalWordCountJava8.java| 2 +- .../beam/examples/complete/game/GameStats.java | 55 +- .../examples/complete/game/HourlyTeamScore.java | 30 +- .../examples/complete/game/LeaderBoard.java | 45 +- .../beam/examples/complete/game/UserScore.java | 18 +- .../complete/game/utils/WriteToBigQuery.java| 49 +- .../game/utils/WriteWindowedToBigQuery.java | 14 +- .../examples/MinimalWordCountJava8Test.java | 2 +- .../examples/complete/game/GameStatsTest.java | 2 +- .../complete/game/HourlyTeamScoreTest.java | 2 +- .../examples/complete/game/LeaderBoardTest.java | 10 +- .../examples/complete/game/UserScoreTest.java | 6 +- examples/pom.xml| 18 +- pom.xml | 123 ++- runners/core-java/pom.xml | 77 +- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 3 +- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 3 +- .../core/GroupByKeyViaGroupByKeyOnly.java | 13 +- .../core/PushbackSideInputDoFnRunner.java | 28 +- .../beam/runners/core/SimpleDoFnRunner.java | 12 + .../beam/runners/core/SplittableParDo.java | 12 + .../beam/runners/core/ReduceFnTester.java | 6 +- .../UnboundedReadFromBoundedSourceTest.java | 4 +- runners/direct-java/pom.xml | 115 +-- .../runners/direct/AggregatorContainer.java | 20 +- ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 16 +- .../beam/runners/direct/DirectGroupByKey.java | 42 +- .../direct/DirectGroupByKeyOverrideFactory.java | 25 +- .../beam/runners/direct/DirectRunner.java | 7 +- .../runners/direct/DirectTimerInternals.java| 13 + .../beam/runners/direct/EvaluationContext.java | 18 +- .../direct/ExecutorServiceParallelExecutor.java | 41 +- .../GroupAlsoByWindowEvaluatorFactory.java | 249 -- .../direct/GroupByKeyOnlyEvaluatorFactory.java | 31 +- .../direct/ImmutableListBundleFactory.java | 21 +- .../direct/PTransformOverrideFactory.java | 8 +- .../beam/runners/direct/ParDoEvaluator.java | 28 +- .../runners/direct/ParDoEvaluatorFactory.java | 126 +++ .../direct/ParDoMultiEvaluatorFactory.java | 107 --- .../direct/ParDoMultiEvaluatorHooks.java| 55 ++ .../runners/direct/ParDoOverrideFactory.java| 14 +- .../direct/ParDoSingleEvaluatorFactory.java | 110 --- .../direct/ParDoSingleEvaluatorHooks.java | 58 ++ .../direct/TestStreamEvaluatorFactory.java | 14 +- .../direct/TransformEvaluatorRegistry.java | 10 +- .../direct/UncommittedBundleOutputManager.java | 50 --
[17/50] incubator-beam git commit: Added Regex Transform and test.
Added Regex Transform and test. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc28799d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc28799d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc28799d Branch: refs/heads/gearpump-runner Commit: bc28799d575341d4ab359c971a10514f518249a0 Parents: 70255d2 Author: Jesse AndersonAuthored: Mon Jun 20 16:44:43 2016 -0700 Committer: Jesse Anderson Committed: Mon Nov 7 09:39:29 2016 -0800 -- .../beam/sdk/transforms/RegexTransform.java | 505 +++ .../beam/sdk/transforms/RegexTransformTest.java | 262 ++ 2 files changed, 767 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc28799d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java new file mode 100644 index 000..bd7848a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@code PTransorm}s to use Regular Expressions to process elements in a + * {@link PCollection}. + * + * + * {@link RegexTransform#matches(String, int)} can be used to see if an entire line matches + * a Regex. {@link RegexTransform#matchesKV(String, int, int)} can be used to see if an entire + * line matches a Regex and output certain groups as a {@link KV}. + * + * + * {@link RegexTransform#find(String, int)} can be used to see if a portion of a line + * matches a Regex. {@link RegexTransform#matchesKV(String, int, int)} can be used to see if a + * portion of a line matches a Regex and output certain groups as a {@link KV}. + * + * + * Lines that do not match the Regex will not be output. + * + */ +public class RegexTransform { + private RegexTransform() { +// do not instantiate + } + + /** + * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if + * the entire line matches the Regex. Returns the entire line (group 0) as a + * {@link PCollection}. + * @param regex + * The regular expression to run + */ + public static Matches matches(String regex) { +return matches(regex, 0); + } + + /** + * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if + * the entire line matches the Regex. Returns the group as a + * {@link PCollection}. + * @param regex + * The regular expression to run + * @param group + * The Regex group to return as a PCollection + */ + public static Matches matches(String regex, int group) { +return new Matches(regex, group); + } + + /** + * Returns a {@link RegexTransform.MatchesKV} {@link PTransform} that checks + * if the entire line matches the Regex. Returns the specified groups as the + * key and value as a {@link PCollection}. + * @param regex + * The regular expression to run + * @param keyGroup + * The Regex group to use as the key + * @param valueGroup + * The Regex group to use the value + */ + public static MatchesKV matchesKV(String regex, int keyGroup, + int valueGroup) { +return new MatchesKV(regex, keyGroup, valueGroup); + } + + /** + * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a + * portion of the line matches the Regex. Returns the entire line (group 0) as + * a {@link PCollection}. + * @param regex + * The regular expression to run + */ + public static Find
[38/50] incubator-beam git commit: Generalize extraction of DoFn parameters from context
Generalize extraction of DoFn parameters from context Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/40ff9d40 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/40ff9d40 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/40ff9d40 Branch: refs/heads/gearpump-runner Commit: 40ff9d401f0ba3f85d1bab848d1c6a662b03bc99 Parents: ac252a7 Author: Kenneth KnowlesAuthored: Thu Nov 3 18:42:25 2016 -0700 Committer: Kenneth Knowles Committed: Mon Nov 7 15:25:03 2016 -0800 -- .../sdk/transforms/reflect/DoFnInvokers.java| 115 --- .../sdk/transforms/reflect/DoFnSignature.java | 11 +- .../sdk/transforms/reflect/DoFnSignatures.java | 15 ++- .../DoFnSignaturesSplittableDoFnTest.java | 5 +- 4 files changed, 94 insertions(+), 52 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/40ff9d40/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index c5a23dc..ad2b766 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -69,6 +69,13 @@ import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BoundedWindowParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.TypeDescriptor; @@ -496,40 +503,76 @@ public class DoFnInvokers { } } + private static StackManipulation simpleExtraContextParameter( +String methodName, +StackManipulation pushExtraContextFactory) { +try { + return new StackManipulation.Compound( +pushExtraContextFactory, +MethodInvocation.invoke( +new MethodDescription.ForLoadedMethod( +DoFn.ExtraContextFactory.class.getMethod(methodName; +} catch (Exception e) { + throw new IllegalStateException( + String.format( + "Failed to locate required method %s.%s", + ExtraContextFactory.class.getSimpleName(), methodName), + e); +} + } + + private static StackManipulation getExtraContextParameter( + DoFnSignature.Parameter parameter, + final StackManipulation pushExtraContextFactory) { + +return parameter.match(new Cases() { + + @Override + public StackManipulation dispatch(BoundedWindowParameter p) { +return simpleExtraContextParameter("window", pushExtraContextFactory); + } + + @Override + public StackManipulation dispatch(InputProviderParameter p) { +return simpleExtraContextParameter("inputProvider", pushExtraContextFactory); + } + + @Override + public StackManipulation dispatch(OutputReceiverParameter p) { +return simpleExtraContextParameter("outputReceiver", pushExtraContextFactory); + } + + @Override + public StackManipulation dispatch(RestrictionTrackerParameter p) { +// ExtraContextFactory.restrictionTracker() returns a RestrictionTracker, +// but the @ProcessElement method expects a concrete subtype of it. +// Insert a downcast. +return new StackManipulation.Compound( +simpleExtraContextParameter("restrictionTracker", pushExtraContextFactory), +TypeCasting.to(new TypeDescription.ForLoadedType(p.trackerT().getRawType(; + } + + @Override + public StackManipulation dispatch(StateParameter p) { +throw new UnsupportedOperationException("State parameters are not yet supported."); + } + + @Override + public StackManipulation
[34/50] incubator-beam git commit: This closes #1300
This closes #1300 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ac252a7e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ac252a7e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ac252a7e Branch: refs/heads/gearpump-runner Commit: ac252a7e151f4287bc349f9d657eae15c0b3e0fc Parents: 912500f ff7fe07 Author: Thomas GrohAuthored: Mon Nov 7 15:08:44 2016 -0800 Committer: Thomas Groh Committed: Mon Nov 7 15:08:44 2016 -0800 -- .../direct/UnboundedReadEvaluatorFactory.java| 6 -- .../UnboundedReadEvaluatorFactoryTest.java | 19 +-- 2 files changed, 17 insertions(+), 8 deletions(-) --
[24/50] incubator-beam git commit: Closes #1295
Closes #1295 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b5f84735 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b5f84735 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b5f84735 Branch: refs/heads/gearpump-runner Commit: b5f847355f2961b992abc3f8b4df71f0d96bb025 Parents: 1102455 e1291ee Author: Dan HalperinAuthored: Mon Nov 7 11:40:38 2016 -0800 Committer: Dan Halperin Committed: Mon Nov 7 11:40:38 2016 -0800 -- examples/java/pom.xml | 13 + examples/java8/pom.xml | 13 + examples/pom.xml | 5 + runners/core-java/pom.xml | 12 runners/direct-java/pom.xml| 13 + runners/flink/pom.xml | 9 + runners/google-cloud-dataflow-java/pom.xml | 13 + runners/pom.xml| 5 + runners/spark/pom.xml | 9 + sdks/java/core/pom.xml | 5 - .../apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java| 4 ++-- sdks/java/io/jms/pom.xml | 13 + sdks/java/io/kafka/pom.xml | 13 + sdks/java/io/kinesis/pom.xml | 13 + sdks/java/io/mongodb/pom.xml | 13 + sdks/java/microbenchmarks/pom.xml | 13 + sdks/pom.xml | 5 + 17 files changed, 164 insertions(+), 7 deletions(-) --
[42/50] incubator-beam git commit: Track Minimum Element Timestamps within Bundles
Track Minimum Element Timestamps within Bundles This allows the Watermark Manager to track pending elements by bundles of elements rather than per-element, which significantly reduces the amount of work done per-element to track watermarks. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a58f1eba Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a58f1eba Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a58f1eba Branch: refs/heads/gearpump-runner Commit: a58f1eba1a3b5ae453f4d65ad409785cb717b2ae Parents: 317b5e6 Author: Thomas GrohAuthored: Fri Nov 4 12:54:00 2016 -0700 Committer: Thomas Groh Committed: Mon Nov 7 15:47:02 2016 -0800 -- .../beam/runners/direct/DirectRunner.java | 5 ++ .../direct/ImmutableListBundleFactory.java | 21 - .../beam/runners/direct/WatermarkManager.java | 48 +--- .../direct/ImmutableListBundleFactoryTest.java | 15 +- 4 files changed, 60 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 44d1986..4d5a449 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -151,6 +151,11 @@ public class DirectRunner Iterable getElements(); /** + * Returns the minimum timestamp among all of the elements of this {@link CommittedBundle}. + */ +Instant getMinTimestamp(); + +/** * Returns the processing time output watermark at the time the producing {@link PTransform} * committed this bundle. Downstream synchronized processing time watermarks cannot progress * past this point before consuming this bundle. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a58f1eba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java index abc6dd8..6b342d6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Instant; @@ -64,6 +65,7 @@ class ImmutableListBundleFactory implements BundleFactory { private final StructuralKey key; private boolean committed = false; private ImmutableList.Builder elements; +private Instant minSoFar = BoundedWindow.TIMESTAMP_MAX_VALUE; /** * Create a new {@link UncommittedImmutableListBundle} for the specified {@link PCollection}. @@ -93,6 +95,9 @@ class ImmutableListBundleFactory implements BundleFactory { element, pcollection); elements.add(element); + if (element.getTimestamp().isBefore(minSoFar)) { +minSoFar = element.getTimestamp(); + } return this; } @@ -102,7 +107,7 @@ class ImmutableListBundleFactory implements BundleFactory { committed = true; final Iterable committedElements = elements.build(); return CommittedImmutableListBundle.create( - pcollection, key, committedElements, synchronizedCompletionTime); + pcollection, key, committedElements, minSoFar, synchronizedCompletionTime); } } @@ -112,9 +117,10 @@ class ImmutableListBundleFactory implements BundleFactory { @Nullable PCollection pcollection, StructuralKey key, Iterable committedElements, +Instant minElementTimestamp, Instant synchronizedCompletionTime) { return new AutoValue_ImmutableListBundleFactory_CommittedImmutableListBundle<>( - pcollection, key,
[29/50] incubator-beam git commit: [BEAM-929] Fix Findbugs issues in Kinesis
[BEAM-929] Fix Findbugs issues in Kinesis * Fix equals and hashcode * Add tests * Remove Serializable from KinesisRecord, as it is in fact coded not serialized Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/367fcac6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/367fcac6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/367fcac6 Branch: refs/heads/gearpump-runner Commit: 367fcac68b38cc613525815181707a071eb8a51c Parents: 9b47228 Author: Dan HalperinAuthored: Mon Nov 7 12:31:19 2016 -0800 Committer: Dan Halperin Committed: Mon Nov 7 13:33:04 2016 -0800 -- sdks/java/io/kinesis/pom.xml| 21 ++-- .../beam/sdk/io/kinesis/CustomOptional.java | 21 .../beam/sdk/io/kinesis/KinesisRecord.java | 3 +-- .../beam/sdk/io/kinesis/CustomOptionalTest.java | 10 ++ 4 files changed, 34 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/367fcac6/sdks/java/io/kinesis/pom.xml -- diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml index e0b57db..36c7039 100644 --- a/sdks/java/io/kinesis/pom.xml +++ b/sdks/java/io/kinesis/pom.xml @@ -30,19 +30,6 @@ Library to read Kinesis streams. - - - - - org.codehaus.mojo - findbugs-maven-plugin - -true - - - - - org.apache.maven.plugins @@ -139,7 +126,6 @@ annotations - junit junit @@ -160,6 +146,12 @@ + com.google.guava + guava-testlib + test + + + org.hamcrest hamcrest-all test @@ -171,6 +163,5 @@ ${project.version} test - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/367fcac6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java -- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java index 4317a59..4bed0e3 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.kinesis; import java.util.NoSuchElementException; +import java.util.Objects; /** * Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element. @@ -53,17 +54,19 @@ abstract class CustomOptional { return value; } - @Override public boolean equals(Object o) { -Present present = (Present) o; +if (!(o instanceof Present)) { +return false; +} -return value != null ? value.equals(present.value) : present.value == null; +Present present = (Present) o; +return Objects.equals(value, present.value); } @Override public int hashCode() { -return value != null ? value.hashCode() : 0; +return Objects.hash(value); } } @@ -82,5 +85,15 @@ abstract class CustomOptional { public T get() { throw new NoSuchElementException(); } + +@Override +public boolean equals(Object o) { +return o instanceof Absent; +} + +@Override +public int hashCode() { +return 0; +} } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/367fcac6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java -- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java index fe2a33d..02b5370 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java @@ -22,7 +22,6 @@ import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.google.common.base.Charsets; -import java.io.Serializable; import java.nio.ByteBuffer;
[05/50] incubator-beam git commit: Closes #1284
Closes #1284 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6e1e57b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6e1e57b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6e1e57b0 Branch: refs/heads/gearpump-runner Commit: 6e1e57b09f4fbab021eb0ebb0748accc8bd90d0a Parents: f06deac f19a25d Author: Dan HalperinAuthored: Fri Nov 4 10:56:40 2016 -0700 Committer: Dan Halperin Committed: Fri Nov 4 10:56:40 2016 -0700 -- pom.xml | 4 sdks/java/pom.xml | 5 + 2 files changed, 1 insertion(+), 8 deletions(-) --
[40/50] incubator-beam git commit: This closes #1282
This closes #1282 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9de9ce69 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9de9ce69 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9de9ce69 Branch: refs/heads/gearpump-runner Commit: 9de9ce69fa010f46911ac0f9cfbe4df2e475772b Parents: ac252a7 9c3e59f Author: Kenneth KnowlesAuthored: Mon Nov 7 15:25:04 2016 -0800 Committer: Kenneth Knowles Committed: Mon Nov 7 15:25:04 2016 -0800 -- .../beam/runners/core/SimpleDoFnRunner.java | 12 ++ .../beam/runners/core/SplittableParDo.java | 12 ++ .../org/apache/beam/sdk/transforms/DoFn.java| 20 +++ .../beam/sdk/transforms/DoFnAdapters.java | 22 +++ .../org/apache/beam/sdk/transforms/ParDo.java | 43 +- .../sdk/transforms/reflect/DoFnInvokers.java| 143 +-- .../sdk/transforms/reflect/DoFnSignature.java | 67 ++--- .../sdk/transforms/reflect/DoFnSignatures.java | 56 +++- .../beam/sdk/transforms/windowing/WindowFn.java | 12 ++ .../apache/beam/sdk/transforms/ParDoTest.java | 61 .../transforms/reflect/DoFnInvokersTest.java| 100 - .../DoFnSignaturesSplittableDoFnTest.java | 5 +- .../transforms/reflect/DoFnSignaturesTest.java | 3 +- 13 files changed, 445 insertions(+), 111 deletions(-) --
[21/50] incubator-beam git commit: This closes #1296
This closes #1296 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/11024552 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/11024552 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/11024552 Branch: refs/heads/gearpump-runner Commit: 110245526d204287ffb30d9b8e91aca4003542f6 Parents: f6a9733 79b0455 Author: Kenneth KnowlesAuthored: Mon Nov 7 10:28:21 2016 -0800 Committer: Kenneth Knowles Committed: Mon Nov 7 10:28:21 2016 -0800 -- .../org/apache/beam/sdk/transforms/Regex.java | 454 + .../beam/sdk/transforms/RegexTransform.java | 505 --- .../apache/beam/sdk/transforms/RegexTest.java | 248 + .../beam/sdk/transforms/RegexTransformTest.java | 262 -- 4 files changed, 702 insertions(+), 767 deletions(-) --
[13/50] incubator-beam git commit: Fix javadoc throughout Beam
Fix javadoc throughout Beam Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/717b431c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/717b431c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/717b431c Branch: refs/heads/gearpump-runner Commit: 717b431c1ac4bad279944c2a1a406b3f08ee Parents: aae65db Author: Dan HalperinAuthored: Fri Nov 4 18:28:53 2016 -0700 Committer: Dan Halperin Committed: Sat Nov 5 01:30:40 2016 -0700 -- .../beam/examples/complete/game/GameStats.java | 3 +- .../examples/complete/game/LeaderBoard.java | 2 +- .../beam/examples/complete/game/UserScore.java | 2 +- .../flink/examples/streaming/AutoComplete.java | 8 ++--- .../flink/examples/streaming/JoinExamples.java | 12 +++ .../examples/streaming/KafkaIOExamples.java | 2 +- .../examples/streaming/WindowedWordCount.java | 8 ++--- .../runners/dataflow/DataflowPipelineJob.java | 1 - .../runners/dataflow/internal/IsmFormat.java| 5 +-- .../runners/dataflow/util/MonitoringUtil.java | 1 - .../runners/dataflow/util/RandomAccessData.java | 2 +- .../java/org/apache/beam/sdk/io/AvroIO.java | 4 +-- .../org/apache/beam/sdk/io/BoundedSource.java | 11 +++--- .../apache/beam/sdk/io/CompressedSource.java| 2 -- .../org/apache/beam/sdk/io/FileBasedSink.java | 9 ++--- .../java/org/apache/beam/sdk/io/XmlSource.java | 2 +- .../apache/beam/sdk/io/range/ByteKeyRange.java | 2 +- .../apache/beam/sdk/options/ValueProvider.java | 2 +- .../apache/beam/sdk/transforms/Aggregator.java | 12 +++ .../apache/beam/sdk/transforms/CombineFns.java | 4 +-- .../org/apache/beam/sdk/transforms/DoFn.java| 22 ++-- .../org/apache/beam/sdk/transforms/ParDo.java | 2 +- .../org/apache/beam/sdk/transforms/View.java| 2 +- .../sdk/transforms/display/DisplayData.java | 5 ++- .../beam/sdk/transforms/join/CoGroupByKey.java | 6 ++-- .../sdk/transforms/reflect/DoFnSignature.java | 2 +- .../splittabledofn/RestrictionTracker.java | 2 +- .../transforms/splittabledofn/package-info.java | 4 +-- .../beam/sdk/util/BaseExecutionContext.java | 2 +- .../BufferedElementCountingOutputStream.java| 4 +-- .../apache/beam/sdk/util/ExecutionContext.java | 8 ++--- .../sdk/util/ExposedByteArrayInputStream.java | 5 +-- .../sdk/util/ExposedByteArrayOutputStream.java | 2 -- .../java/org/apache/beam/sdk/util/GcsUtil.java | 2 -- .../apache/beam/sdk/util/MovingFunction.java| 4 +-- .../org/apache/beam/sdk/util/PubsubClient.java | 32 +++--- .../org/apache/beam/sdk/util/StringUtils.java | 8 ++--- .../apache/beam/sdk/util/TimerInternals.java| 8 +++-- .../beam/sdk/util/state/StateNamespace.java | 16 - .../java/org/apache/beam/sdk/values/PDone.java | 2 +- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 2 +- .../apache/beam/sdk/io/kinesis/KinesisIO.java | 35 +--- 42 files changed, 109 insertions(+), 160 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/717b431c/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java -- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index 5ebf892..2b5255f 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -100,7 +100,8 @@ public class GameStats extends LeaderBoard { /** * Filter out all but those users with a high clickrate, which we will consider as 'spammy' uesrs. * We do this by finding the mean total score per user, then using that information as a side - * input to filter out all but those user scores that are > (mean * SCORE_WEIGHT) + * input to filter out all but those user scores that are larger than + * {@code (mean * SCORE_WEIGHT)}. */ // [START DocInclude_AbuseDetect] public static class CalculateSpammyUsers http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/717b431c/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java -- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index d5e3345..0a3dfb3 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++
[32/50] incubator-beam git commit: [BEAM-898] Fix Jenkins Build Failure - IT Options Conflicts
[BEAM-898] Fix Jenkins Build Failure - IT Options Conflicts This closes #1301 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/912500f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/912500f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/912500f1 Branch: refs/heads/gearpump-runner Commit: 912500f13a1c5d6cc18752567bddd137c54795d1 Parents: 3e84a5f 1927968 Author: Luke CwikAuthored: Mon Nov 7 13:46:14 2016 -0800 Committer: Luke Cwik Committed: Mon Nov 7 13:46:14 2016 -0800 -- .../apache/beam/examples/WindowedWordCountIT.java | 16 +++- .../java/org/apache/beam/examples/WordCountIT.java | 17 +++-- .../examples/cookbook/BigQueryTornadoesIT.java | 16 +++- 3 files changed, 21 insertions(+), 28 deletions(-) --
[10/50] incubator-beam git commit: [BEAM-889] Let Spark handle the user-provided checkpointDir, but warn if not a reliable fs.
[BEAM-889] Let Spark handle the user-provided checkpointDir, but warn if not a reliable fs. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90a75d1f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90a75d1f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90a75d1f Branch: refs/heads/gearpump-runner Commit: 90a75d1fb0706ec4cc25a9eeeca8ade1b3b7de28 Parents: 46fbfe0 Author: SelaAuthored: Thu Nov 3 18:22:20 2016 +0200 Committer: Sela Committed: Fri Nov 4 23:59:40 2016 +0200 -- .../runners/spark/SparkPipelineOptions.java | 3 +-- .../SparkRunnerStreamingContextFactory.java | 23 +--- .../streaming/EmptyStreamAssertionTest.java | 3 +-- .../streaming/FlattenStreamingTest.java | 6 ++--- .../streaming/KafkaStreamingTest.java | 6 ++--- .../ResumeFromCheckpointStreamingTest.java | 3 +-- .../streaming/SimpleStreamingWordCountTest.java | 3 +-- .../utils/TestOptionsForStreaming.java | 12 +- 8 files changed, 19 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 08e14fe..4eada35 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -77,8 +77,7 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, class TmpCheckpointDirFactory implements DefaultValueFactory { @Override public String create(PipelineOptions options) { - SparkPipelineOptions sparkPipelineOptions = options.as(SparkPipelineOptions.class); - return "file:///tmp/" + sparkPipelineOptions.getJobName(); + return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a75d1f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index 2378788..a670f61 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -20,11 +20,6 @@ package org.apache.beam.runners.spark.translation.streaming; import static com.google.common.base.Preconditions.checkArgument; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Arrays; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.translation.SparkContextFactory; @@ -48,7 +43,7 @@ import org.slf4j.LoggerFactory; public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory { private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class); - private static final Iterable KNOWN_RELIABLE_FS = Arrays.asList("hdfs", "s3", "gs"); + private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)"; private final Pipeline pipeline; private final SparkPipelineOptions options; @@ -83,19 +78,11 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF // set checkpoint dir. String checkpointDir = options.getCheckpointDir(); -LOG.info("Checkpoint dir set to: {}", checkpointDir); -try { - // validate checkpoint dir and warn if not of a known durable filesystem. - URL checkpointDirUrl = new URL(checkpointDir); - if (!Iterables.any(KNOWN_RELIABLE_FS, Predicates.equalTo(checkpointDirUrl.getProtocol( { -LOG.warn("Checkpoint dir URL {} does not match a reliable filesystem, in case of failures " -+ "this job may not recover properly or even at all.", checkpointDirUrl); - } -} catch (MalformedURLException e) { - throw new
[45/50] incubator-beam git commit: [BEAM-917] ExpectedLogs: clear saved records after each test.
[BEAM-917] ExpectedLogs: clear saved records after each test. This closes #1289 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/339dee95 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/339dee95 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/339dee95 Branch: refs/heads/gearpump-runner Commit: 339dee9542497d845873dbd939c7868bdd9c0835 Parents: c6d9bf2 6bf729e Author: Luke CwikAuthored: Mon Nov 7 17:38:06 2016 -0800 Committer: Luke Cwik Committed: Mon Nov 7 17:38:06 2016 -0800 -- .../apache/beam/sdk/testing/ExpectedLogs.java | 15 ++ .../beam/sdk/testing/ExpectedLogsTest.java | 30 +++- 2 files changed, 39 insertions(+), 6 deletions(-) --
[28/50] incubator-beam git commit: Closes #1298
Closes #1298 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9b47228c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9b47228c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9b47228c Branch: refs/heads/gearpump-runner Commit: 9b47228c3046ac2e1f45d307f44bce4ab1128c1c Parents: baa7fb0 0604d2b Author: Dan HalperinAuthored: Mon Nov 7 13:31:48 2016 -0800 Committer: Dan Halperin Committed: Mon Nov 7 13:31:48 2016 -0800 -- sdks/java/extensions/sorter/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) --
[08/50] incubator-beam git commit: Remove @Default from IT options
Remove @Default from IT options Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1eccd29b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1eccd29b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1eccd29b Branch: refs/heads/gearpump-runner Commit: 1eccd29b72aedc87e69763fbcc828b5694335e1f Parents: 99062d1 Author: Mark LiuAuthored: Thu Nov 3 16:45:43 2016 -0700 Committer: Luke Cwik Committed: Fri Nov 4 14:19:43 2016 -0700 -- .../beam/examples/WindowedWordCountIT.java | 11 --- .../org/apache/beam/examples/WordCountIT.java| 19 +++ .../examples/cookbook/BigQueryTornadoesIT.java | 11 --- 3 files changed, 27 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eccd29b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index 6742654..d545ad2 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -17,9 +17,9 @@ */ package org.apache.beam.examples; +import com.google.common.base.Strings; import java.io.IOException; import org.apache.beam.examples.WindowedWordCount.Options; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.BigqueryMatcher; @@ -37,12 +37,13 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class WindowedWordCountIT { + private static final String DEFAULT_OUTPUT_CHECKSUM = "ff54f6f42b2afeb146206c1e8e915deaee0362b4"; + /** * Options for the {@link WindowedWordCount} Integration Test. */ public interface WindowedWordCountITOptions extends Options, TestPipelineOptions, StreamingOptions { -@Default.String("ff54f6f42b2afeb146206c1e8e915deaee0362b4") String getChecksum(); void setChecksum(String value); } @@ -66,9 +67,13 @@ public class WindowedWordCountIT { String query = String.format("SELECT word, SUM(count) FROM [%s:%s.%s] GROUP BY word", options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable()); +String outputChecksum = +Strings.isNullOrEmpty(options.getChecksum()) +? DEFAULT_OUTPUT_CHECKSUM +: options.getChecksum(); options.setOnSuccessMatcher( new BigqueryMatcher( -options.getAppName(), options.getProject(), query, options.getChecksum())); +options.getAppName(), options.getProject(), query, outputChecksum)); WindowedWordCount.main(TestPipeline.convertToArgs(options)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eccd29b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java index 2f2ea46..8f170af 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java @@ -18,9 +18,9 @@ package org.apache.beam.examples; +import com.google.common.base.Strings; import java.util.Date; import org.apache.beam.examples.WordCount.WordCountOptions; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.FileChecksumMatcher; import org.apache.beam.sdk.testing.TestPipeline; @@ -36,6 +36,8 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class WordCountIT { + private static final String DEFAULT_OUTPUT_CHECKSUM = "8ae94f799f97cfd1cb5e8125951b32dfb52e1f12"; + /** * Options for the WordCount Integration Test. * @@ -43,9 +45,8 @@ public class WordCountIT { * with customized input. */ public interface WordCountITOptions extends TestPipelineOptions, WordCountOptions { -@Default.String("c04722202dee29c442b55ead54c6000693e85e77") -String getOutputChecksum(); -void setOutputChecksum(String value); +String getChecksum(); +void setChecksum(String value); } @Test @@ -58,11 +59,13 @@ public class WordCountIT { String.format("WordCountIT-%tF-%
[50/50] incubator-beam git commit: This closes #1306
This closes #1306 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a14927f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a14927f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a14927f4 Branch: refs/heads/gearpump-runner Commit: a14927f400d8d28f1cff3b21e384a66c647ae3a5 Parents: 323ec11 2a96a17 Author: Kenneth KnowlesAuthored: Mon Nov 7 19:33:32 2016 -0800 Committer: Kenneth Knowles Committed: Mon Nov 7 19:33:32 2016 -0800 -- .travis.yml | 22 +- examples/java/pom.xml | 92 +-- .../beam/examples/DebuggingWordCount.java | 2 +- .../apache/beam/examples/MinimalWordCount.java | 2 +- .../org/apache/beam/examples/WordCount.java | 2 +- .../beam/examples/common/ExampleUtils.java | 3 +- .../apache/beam/examples/complete/TfIdf.java| 8 +- .../examples/complete/TopWikipediaSessions.java | 2 +- .../examples/cookbook/BigQueryTornadoes.java| 2 +- .../cookbook/CombinePerKeyExamples.java | 2 +- .../beam/examples/cookbook/DeDupExample.java| 10 +- .../beam/examples/cookbook/FilterExamples.java | 2 +- .../beam/examples/cookbook/JoinExamples.java| 2 +- .../examples/cookbook/MaxPerKeyExamples.java| 2 +- .../org/apache/beam/examples/cookbook/README.md | 2 +- .../beam/examples/WindowedWordCountIT.java | 15 +- .../org/apache/beam/examples/WordCountIT.java | 18 +- .../org/apache/beam/examples/WordCountTest.java | 2 +- .../examples/complete/AutoCompleteTest.java | 6 +- .../beam/examples/complete/TfIdfTest.java | 6 +- .../complete/TopWikipediaSessionsTest.java | 2 +- .../examples/cookbook/BigQueryTornadoesIT.java | 15 +- .../examples/cookbook/DeDupExampleTest.java | 82 -- .../examples/cookbook/DistinctExampleTest.java | 82 ++ .../examples/cookbook/JoinExamplesTest.java | 2 +- .../examples/cookbook/TriggerExampleTest.java | 2 +- examples/java8/pom.xml | 32 +- .../beam/examples/MinimalWordCountJava8.java| 2 +- .../beam/examples/complete/game/GameStats.java | 55 +- .../examples/complete/game/HourlyTeamScore.java | 30 +- .../examples/complete/game/LeaderBoard.java | 45 +- .../beam/examples/complete/game/UserScore.java | 18 +- .../complete/game/utils/WriteToBigQuery.java| 49 +- .../game/utils/WriteWindowedToBigQuery.java | 14 +- .../examples/MinimalWordCountJava8Test.java | 2 +- .../examples/complete/game/GameStatsTest.java | 2 +- .../complete/game/HourlyTeamScoreTest.java | 2 +- .../examples/complete/game/LeaderBoardTest.java | 10 +- .../examples/complete/game/UserScoreTest.java | 6 +- examples/pom.xml| 18 +- pom.xml | 123 ++- runners/core-java/pom.xml | 77 +- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 3 +- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 3 +- .../core/GroupByKeyViaGroupByKeyOnly.java | 13 +- .../core/PushbackSideInputDoFnRunner.java | 28 +- .../beam/runners/core/SimpleDoFnRunner.java | 12 + .../beam/runners/core/SplittableParDo.java | 12 + .../beam/runners/core/ReduceFnTester.java | 6 +- .../UnboundedReadFromBoundedSourceTest.java | 4 +- runners/direct-java/pom.xml | 115 +-- .../runners/direct/AggregatorContainer.java | 20 +- ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 16 +- .../beam/runners/direct/DirectGroupByKey.java | 42 +- .../direct/DirectGroupByKeyOverrideFactory.java | 25 +- .../beam/runners/direct/DirectRunner.java | 7 +- .../runners/direct/DirectTimerInternals.java| 13 + .../beam/runners/direct/EvaluationContext.java | 18 +- .../direct/ExecutorServiceParallelExecutor.java | 41 +- .../GroupAlsoByWindowEvaluatorFactory.java | 249 -- .../direct/GroupByKeyOnlyEvaluatorFactory.java | 31 +- .../direct/ImmutableListBundleFactory.java | 21 +- .../direct/PTransformOverrideFactory.java | 8 +- .../beam/runners/direct/ParDoEvaluator.java | 28 +- .../runners/direct/ParDoEvaluatorFactory.java | 126 +++ .../direct/ParDoMultiEvaluatorFactory.java | 107 --- .../direct/ParDoMultiEvaluatorHooks.java| 55 ++ .../runners/direct/ParDoOverrideFactory.java| 14 +- .../direct/ParDoSingleEvaluatorFactory.java | 110 --- .../direct/ParDoSingleEvaluatorHooks.java | 58 ++ .../direct/TestStreamEvaluatorFactory.java | 14 +- .../direct/TransformEvaluatorRegistry.java | 10 +- .../direct/UncommittedBundleOutputManager.java | 50 -- .../runners/direct/ViewEvaluatorFactory.java| 19 +-
[44/50] incubator-beam git commit: [BEAM-917] ExpectedLogs: clear saved records after each test.
[BEAM-917] ExpectedLogs: clear saved records after each test. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6bf729e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6bf729e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6bf729e9 Branch: refs/heads/gearpump-runner Commit: 6bf729e90c38e910138d332c994325223c220abd Parents: c6d9bf2 Author: Pei HeAuthored: Fri Nov 4 18:45:47 2016 -0700 Committer: Luke Cwik Committed: Mon Nov 7 17:36:00 2016 -0800 -- .../apache/beam/sdk/testing/ExpectedLogs.java | 15 ++ .../beam/sdk/testing/ExpectedLogsTest.java | 30 +++- 2 files changed, 39 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6bf729e9/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogs.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogs.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogs.java index a8e3f94..3e51f34 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogs.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogs.java @@ -268,6 +268,7 @@ public class ExpectedLogs extends ExternalResource { protected void after() { log.removeHandler(logSaver); log.setLevel(previousLevel); +logSaver.reset(); } private final Logger log; @@ -285,11 +286,7 @@ public class ExpectedLogs extends ExternalResource { */ @ThreadSafe private static class LogSaver extends Handler { -Collection logRecords = new ConcurrentLinkedDeque<>(); - -public Collection getLogs() { - return logRecords; -} +private final Collection logRecords = new ConcurrentLinkedDeque<>(); @Override public void publish(LogRecord record) { @@ -301,5 +298,13 @@ public class ExpectedLogs extends ExternalResource { @Override public void close() throws SecurityException {} + +private Collection getLogs() { + return logRecords; +} + +private void reset() { + logRecords.clear(); +} } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6bf729e9/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java index 84d5584..1762d0d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.testing; import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; @@ -30,8 +31,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.Description; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.model.Statement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +42,6 @@ import org.slf4j.LoggerFactory; @RunWith(JUnit4.class) public class ExpectedLogsTest { private static final Logger LOG = LoggerFactory.getLogger(ExpectedLogsTest.class); - private Random random = new Random(); @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(ExpectedLogsTest.class); @@ -146,6 +148,32 @@ public class ExpectedLogsTest { } } + @Test + public void testLogsCleared() throws Throwable { +final String messageUnexpected = "Message prior to ExpectedLogs."; +final String messageExpected = "Message expected."; +LOG.info(messageUnexpected); + +expectedLogs = ExpectedLogs.none(ExpectedLogsTest.class); +final boolean[] evaluateRan = new boolean[1]; + +expectedLogs.apply( +new Statement() { + @Override + public void evaluate() throws Throwable { +evaluateRan[0] = true; +expectedLogs.verifyNotLogged(messageUnexpected); +LOG.info(messageExpected); +expectedLogs.verifyInfo(messageExpected); + } +}, +Description.EMPTY).evaluate(); +assertTrue(evaluateRan[0]); +// Verify expectedLogs is cleared. +expectedLogs.verifyNotLogged(messageExpected); +expectedLogs.verifyNotLogged(messageUnexpected); + } + // Generates a random fake error message.
[07/50] incubator-beam git commit: Flatten FiredTimers and ExtractFiredTimers
Flatten FiredTimers and ExtractFiredTimers Pass a single collection of fired timers, and have those objects contain the associated transform and key that they fired for. Timers already contain the domain they are in. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5dca2674 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5dca2674 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5dca2674 Branch: refs/heads/gearpump-runner Commit: 5dca2674a8d145c6e619005c2282c6064cd7aab7 Parents: 6e1e57b Author: Thomas GrohAuthored: Thu Nov 3 14:10:37 2016 -0700 Committer: Thomas Groh Committed: Fri Nov 4 13:05:21 2016 -0700 -- .../beam/runners/direct/EvaluationContext.java | 6 +- .../direct/ExecutorServiceParallelExecutor.java | 41 .../beam/runners/direct/WatermarkManager.java | 79 -- .../runners/direct/EvaluationContextTest.java | 23 ++--- .../runners/direct/WatermarkManagerTest.java| 102 ++- 5 files changed, 109 insertions(+), 142 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 965e77d..b814def 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -391,11 +391,9 @@ class EvaluationContext { * This is a destructive operation. Timers will only appear in the result of this method once * for each time they are set. */ - public Map > extractFiredTimers() { + public Collection extractFiredTimers() { forceRefresh(); -Map > fired = -watermarkManager.extractFiredTimers(); -return fired; +return watermarkManager.extractFiredTimers(); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index e32f671..d1ffea1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; @@ -440,29 +439,23 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { */ private void fireTimers() throws Exception { try { -for (Map.Entry< - AppliedPTransform, Map > transformTimers : -evaluationContext.extractFiredTimers().entrySet()) { - AppliedPTransform transform = transformTimers.getKey(); - for (Map.Entry keyTimers : - transformTimers.getValue().entrySet()) { -for (TimeDomain domain : TimeDomain.values()) { - Collection delivery = keyTimers.getValue().getTimers(domain); - if (delivery.isEmpty()) { -continue; - } - KeyedWorkItem work = - KeyedWorkItems.timersWorkItem(keyTimers.getKey().getKey(), delivery); - @SuppressWarnings({"unchecked", "rawtypes"}) - CommittedBundle bundle = - evaluationContext - .createKeyedBundle(keyTimers.getKey(), (PCollection) transform.getInput()) - .add(WindowedValue.valueInGlobalWindow(work)) - .commit(evaluationContext.now()); - scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery)); -
[36/50] incubator-beam git commit: Add DoFnInvoker dispatch for State and Timer parameters
Add DoFnInvoker dispatch for State and Timer parameters Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2db8268 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2db8268 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2db8268 Branch: refs/heads/gearpump-runner Commit: e2db82686008aea224ca5cf1ef1acc2831c46ceb Parents: c052d2a Author: Kenneth KnowlesAuthored: Thu Nov 3 19:18:24 2016 -0700 Committer: Kenneth Knowles Committed: Mon Nov 7 15:25:03 2016 -0800 -- .../beam/runners/core/SimpleDoFnRunner.java | 12 +++ .../beam/runners/core/SplittableParDo.java | 12 +++ .../org/apache/beam/sdk/transforms/DoFn.java| 20 .../beam/sdk/transforms/DoFnAdapters.java | 22 .../sdk/transforms/reflect/DoFnInvokers.java| 104 +++ .../transforms/reflect/DoFnInvokersTest.java| 59 ++- 6 files changed, 187 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index dec9905..3abb06b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -48,11 +48,13 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.ExecutionContext.StepContext; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -532,6 +534,16 @@ public class SimpleDoFnRunner implements DoFnRunner windowingInternals() { return new WindowingInternals () { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 33d0ab7..d8ee1d5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -46,9 +46,11 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; @@ -432,6 +434,16 @@ public class SplittableParDo< public TrackerT restrictionTracker() { return tracker; } + + @Override + public State state(String stateId) { +throw new UnsupportedOperationException("State cannot be used with a splittable DoFn"); + } + + @Override + public Timer timer(String timerId) { +throw new UnsupportedOperationException("Timers cannot be used with a splittable DoFn"); + } } }
[16/50] incubator-beam git commit: Closes #1292
Closes #1292 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/70255d26 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/70255d26 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/70255d26 Branch: refs/heads/gearpump-runner Commit: 70255d26880ce6443b8a931a6a778040fdbc5339 Parents: 5864a38 68f2638 Author: Dan HalperinAuthored: Mon Nov 7 08:51:40 2016 -0800 Committer: Dan Halperin Committed: Mon Nov 7 08:51:40 2016 -0800 -- examples/java/pom.xml | 6 -- examples/java8/pom.xml | 6 -- pom.xml| 5 + runners/core-java/pom.xml | 6 -- runners/direct-java/pom.xml| 6 -- runners/flink/examples/pom.xml | 4 runners/flink/runner/pom.xml | 4 runners/google-cloud-dataflow-java/pom.xml | 6 -- runners/spark/pom.xml | 4 sdks/java/core/pom.xml | 6 -- sdks/java/extensions/join-library/pom.xml | 4 sdks/java/extensions/sorter/pom.xml| 4 sdks/java/io/google-cloud-platform/pom.xml | 4 sdks/java/io/hdfs/pom.xml | 4 sdks/java/io/jdbc/pom.xml | 4 sdks/java/io/jms/pom.xml | 4 sdks/java/io/kafka/pom.xml | 4 sdks/java/io/kinesis/pom.xml | 4 sdks/java/io/mongodb/pom.xml | 4 sdks/java/java8tests/pom.xml | 6 -- sdks/java/microbenchmarks/pom.xml | 5 - 21 files changed, 5 insertions(+), 95 deletions(-) --
[02/50] incubator-beam git commit: checkstyle: move from individual modules to root poms
checkstyle: move from individual modules to root poms Checkstyle config is no longer needed in individual modules; instead, move it only to sdks/ , runners/, and examples/ poms. Individual modules like sdks/java/core may still reference checkstyle to configure their own options. The only sketchy bit is that I had to link sdks/java/build-tools to beam-parent rather than sdks/java so that I didn't create a build dependency loop. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f50b2142 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f50b2142 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f50b2142 Branch: refs/heads/gearpump-runner Commit: f50b2142b641c6d2aca186903139cff995f7d49e Parents: faf55c7 Author: Dan HalperinAuthored: Thu Nov 3 17:24:52 2016 -0700 Committer: Dan Halperin Committed: Fri Nov 4 10:51:28 2016 -0700 -- examples/java/pom.xml | 5 - examples/java8/pom.xml| 5 - examples/pom.xml | 13 - pom.xml | 3 +++ runners/core-java/pom.xml | 5 - runners/direct-java/pom.xml | 5 - runners/flink/examples/pom.xml| 5 - runners/flink/runner/pom.xml | 5 - runners/google-cloud-dataflow-java/pom.xml| 7 --- runners/pom.xml | 11 +++ runners/spark/pom.xml | 4 sdks/java/build-tools/pom.xml | 4 ++-- .../src/main/resources/beam/checkstyle.xml| 2 +- sdks/java/core/pom.xml| 18 +- sdks/java/extensions/join-library/pom.xml | 4 sdks/java/extensions/sorter/pom.xml | 4 sdks/java/io/google-cloud-platform/pom.xml| 4 sdks/java/io/hdfs/pom.xml | 4 sdks/java/io/jdbc/pom.xml | 6 +- sdks/java/io/jms/pom.xml | 4 sdks/java/io/kafka/pom.xml| 4 sdks/java/io/kinesis/pom.xml | 4 sdks/java/io/mongodb/pom.xml | 4 sdks/java/java8tests/pom.xml | 5 - sdks/java/microbenchmarks/pom.xml | 5 - sdks/java/pom.xml | 4 +++- sdks/pom.xml | 16 +++- 27 files changed, 57 insertions(+), 103 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f50b2142/examples/java/pom.xml -- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index fc82ed4..12a114f 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -277,11 +277,6 @@ - -org.apache.maven.plugins -maven-checkstyle-plugin - - org.apache.maven.plugins http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f50b2142/examples/java8/pom.xml -- diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml index e6408dc..18257d9 100644 --- a/examples/java8/pom.xml +++ b/examples/java8/pom.xml @@ -148,11 +148,6 @@ - -org.apache.maven.plugins -maven-checkstyle-plugin - - org.apache.maven.plugins http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f50b2142/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 2820473..c6f9cb3 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -45,7 +45,18 @@ java8 - + + + release + + + +org.apache.maven.plugins +maven-checkstyle-plugin + + + + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f50b2142/pom.xml -- diff --git a/pom.xml b/pom.xml index ea7d4ae..28845a5 100644 --- a/pom.xml +++ b/pom.xml @@ -140,6 +140,9 @@ pom + +sdks/java/build-tools sdks runners - -org.apache.maven.plugins -maven-checkstyle-plugin - - org.apache.maven.plugins maven-jar-plugin http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f50b2142/runners/pom.xml
[04/50] incubator-beam git commit: pom: move maven-archetypes to its proper spot in dep hierarchy
pom: move maven-archetypes to its proper spot in dep hierarchy Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f19a25da Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f19a25da Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f19a25da Branch: refs/heads/gearpump-runner Commit: f19a25da225b65ecdf43342ec48389a8348e68ce Parents: f06deac Author: Dan HalperinAuthored: Thu Nov 3 17:30:58 2016 -0700 Committer: Dan Halperin Committed: Fri Nov 4 10:56:39 2016 -0700 -- pom.xml | 4 sdks/java/pom.xml | 5 + 2 files changed, 1 insertion(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f19a25da/pom.xml -- diff --git a/pom.xml b/pom.xml index 28845a5..14c38fd 100644 --- a/pom.xml +++ b/pom.xml @@ -145,10 +145,6 @@ sdks/java/build-tools sdks runners - -sdks/java/maven-archetypes examples http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f19a25da/sdks/java/pom.xml -- diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml index 2866cb3..fb07d25 100644 --- a/sdks/java/pom.xml +++ b/sdks/java/pom.xml @@ -38,10 +38,7 @@ build-tools --> core io - +maven-archetypes extensions microbenchmarks
[22/50] incubator-beam git commit: Disable all broken findbugs builds and link to JIRA issues
Disable all broken findbugs builds and link to JIRA issues Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e1291eea Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e1291eea Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e1291eea Branch: refs/heads/gearpump-runner Commit: e1291eea9e21dce4df44245d53a78664003c264a Parents: fae52a3 Author: Dan HalperinAuthored: Mon Nov 7 09:54:04 2016 -0800 Committer: Dan Halperin Committed: Mon Nov 7 10:43:11 2016 -0800 -- examples/java/pom.xml | 13 + examples/java8/pom.xml | 13 + runners/core-java/pom.xml | 12 runners/direct-java/pom.xml| 13 + runners/flink/pom.xml | 9 + runners/google-cloud-dataflow-java/pom.xml | 13 + runners/spark/pom.xml | 9 + .../apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java| 4 ++-- sdks/java/io/jms/pom.xml | 13 + sdks/java/io/kafka/pom.xml | 13 + sdks/java/io/kinesis/pom.xml | 13 + sdks/java/io/mongodb/pom.xml | 13 + sdks/java/microbenchmarks/pom.xml | 13 + 13 files changed, 149 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1291eea/examples/java/pom.xml -- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 5d69bfe..6d18a0f 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -260,6 +260,19 @@ + + + + + org.codehaus.mojo + findbugs-maven-plugin + +true + + + + + maven-compiler-plugin http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1291eea/examples/java8/pom.xml -- diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml index 72f24ca..8591955 100644 --- a/examples/java8/pom.xml +++ b/examples/java8/pom.xml @@ -118,6 +118,19 @@ + + + + + org.codehaus.mojo + findbugs-maven-plugin + +true + + + + + maven-compiler-plugin http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1291eea/runners/core-java/pom.xml -- diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index 40ebf58..aa5f145 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -33,6 +33,18 @@ jar + + + + + org.codehaus.mojo + findbugs-maven-plugin + +true + + + + org.apache.maven.plugins http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1291eea/runners/direct-java/pom.xml -- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index 8983b1c..43cf3c0 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -40,6 +40,19 @@ + + + + + org.codehaus.mojo + findbugs-maven-plugin + +true + + + + + org.apache.maven.plugins http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1291eea/runners/flink/pom.xml -- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index e012c4b..1b73922 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -95,6 +95,15 @@ + + + + org.codehaus.mojo + findbugs-maven-plugin + +true + + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1291eea/runners/google-cloud-dataflow-java/pom.xml -- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index a5ffe0e..59b1465 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -45,6 +45,19 @@ +
[25/50] incubator-beam git commit: [BEAM-725] Migrate to use the generic credentials library compatible with Apiary and gRPC instead of the Apiary only credentials library.
[BEAM-725] Migrate to use the generic credentials library compatible with Apiary and gRPC instead of the Apiary only credentials library. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bb260ecd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bb260ecd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bb260ecd Branch: refs/heads/gearpump-runner Commit: bb260ecd34e14a29e7939912a101b3733e379248 Parents: b5f8473 Author: Luke CwikAuthored: Mon Nov 7 09:40:38 2016 -0800 Committer: Luke Cwik Committed: Mon Nov 7 13:08:39 2016 -0800 -- pom.xml | 21 ++ runners/google-cloud-dataflow-java/pom.xml | 15 +- .../dataflow/util/DataflowTransport.java| 9 +- sdks/java/core/pom.xml | 19 +- .../org/apache/beam/sdk/options/GcpOptions.java | 136 ++--- .../beam/sdk/testing/BigqueryMatcher.java | 15 +- .../apache/beam/sdk/util/CredentialFactory.java | 4 +- .../org/apache/beam/sdk/util/Credentials.java | 192 --- .../beam/sdk/util/GcpCredentialFactory.java | 41 +++- .../beam/sdk/util/NoopCredentialFactory.java| 9 +- .../apache/beam/sdk/util/PubsubGrpcClient.java | 18 +- .../apache/beam/sdk/util/PubsubJsonClient.java | 9 +- .../apache/beam/sdk/util/TestCredential.java| 44 +++-- .../org/apache/beam/sdk/util/Transport.java | 9 +- .../beam/sdk/util/PubsubGrpcClientTest.java | 9 +- sdks/java/io/google-cloud-platform/pom.xml | 9 +- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 25 ++- .../beam/sdk/io/gcp/datastore/V1TestUtil.java | 25 ++- 18 files changed, 186 insertions(+), 423 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb260ecd/pom.xml -- diff --git a/pom.xml b/pom.xml index c1bd5c8..bd6037e 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,7 @@ 1.2.0 1.0-rc2 1.1 +0.6.0 1.22.0 1.4.5 0.5.160304 @@ -530,6 +531,26 @@ +com.google.auth +google-auth-library-credentials +${google-auth.version} + + + +com.google.auth +google-auth-library-oauth2-http +${google-auth.version} + + + +com.google.guava +guava-jdk5 + + + + + com.google.apis google-api-services-bigquery ${bigquery.version} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb260ecd/runners/google-cloud-dataflow-java/pom.xml -- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 59b1465..6ed41d0 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -171,11 +171,6 @@ - com.google.oauth-client - google-oauth-client - - - com.google.http-client google-http-client @@ -202,6 +197,16 @@ + com.google.auth + google-auth-library-credentials + + + + com.google.auth + google-auth-library-oauth2-http + + + com.google.cloud.bigdataoss util http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb260ecd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java index 0391594..e0026de 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java @@ -20,10 +20,11 @@ package org.apache.beam.runners.dataflow.util; import static org.apache.beam.sdk.util.Transport.getJsonFactory; import static org.apache.beam.sdk.util.Transport.getTransport; -import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.services.clouddebugger.v2.Clouddebugger; import com.google.api.services.dataflow.Dataflow; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.common.collect.ImmutableList;
[37/50] incubator-beam git commit: Rename BoundedWindowParameter -> WindowParameter
Rename BoundedWindowParameter -> WindowParameter Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/85b908be Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/85b908be Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/85b908be Branch: refs/heads/gearpump-runner Commit: 85b908be5b5858ff9dfb5d7f04c0e3ca28dbaa05 Parents: e2db826 Author: Kenneth KnowlesAuthored: Thu Nov 3 21:27:24 2016 -0700 Committer: Kenneth Knowles Committed: Mon Nov 7 15:25:03 2016 -0800 -- .../sdk/transforms/reflect/DoFnInvokers.java| 4 ++-- .../sdk/transforms/reflect/DoFnSignature.java | 24 ++-- .../transforms/reflect/DoFnSignaturesTest.java | 3 ++- 3 files changed, 16 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/85b908be/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index b7f75ed..ba95f98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -68,13 +68,13 @@ import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BoundedWindowParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.UserCodeException; @@ -537,7 +537,7 @@ public class DoFnInvokers { new Cases() { @Override - public StackManipulation dispatch(BoundedWindowParameter p) { + public StackManipulation dispatch(WindowParameter p) { return simpleExtraContextParameter("window", pushExtraContextFactory); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/85b908be/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index a189bd5..befc10b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -33,9 +33,9 @@ import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BoundedWindowParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Timer; @@ -159,8 +159,8 @@ public abstract class DoFnSignature { public ResultT match(Cases cases) { // This could be done with reflection, but since the number of cases is small and known, // they are simply inlined. - if (this instanceof BoundedWindowParameter) { -return cases.dispatch((BoundedWindowParameter) this); + if (this instanceof WindowParameter) { +return cases.dispatch((WindowParameter)
[23/50] incubator-beam git commit: move findbugs execution to release profile, enable in all modules
move findbugs execution to release profile, enable in all modules Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fae52a3f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fae52a3f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fae52a3f Branch: refs/heads/gearpump-runner Commit: fae52a3f8f4963524c7883450aa740aefaf4a9c6 Parents: 1102455 Author: Dan HalperinAuthored: Mon Nov 7 08:50:07 2016 -0800 Committer: Dan Halperin Committed: Mon Nov 7 10:43:11 2016 -0800 -- examples/pom.xml | 5 + runners/pom.xml| 5 + sdks/java/core/pom.xml | 5 - sdks/pom.xml | 5 + 4 files changed, 15 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fae52a3f/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index c6f9cb3..eb42861 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -54,6 +54,11 @@ org.apache.maven.plugins maven-checkstyle-plugin + + +org.codehaus.mojo +findbugs-maven-plugin + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fae52a3f/runners/pom.xml -- diff --git a/runners/pom.xml b/runners/pom.xml index 9c821cc..8084d0b 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -49,6 +49,11 @@ org.apache.maven.plugins maven-checkstyle-plugin + + +org.codehaus.mojo +findbugs-maven-plugin + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fae52a3f/sdks/java/core/pom.xml -- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 7906afb..77a3309 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -147,11 +147,6 @@ - -org.codehaus.mojo -findbugs-maven-plugin - - org.jacoco http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fae52a3f/sdks/pom.xml -- diff --git a/sdks/pom.xml b/sdks/pom.xml index 6347fe1..29ccd37 100644 --- a/sdks/pom.xml +++ b/sdks/pom.xml @@ -45,6 +45,11 @@ org.apache.maven.plugins maven-checkstyle-plugin + + +org.codehaus.mojo +findbugs-maven-plugin +
[31/50] incubator-beam git commit: Fix IT Options Conflict
Fix IT Options Conflict Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/19279689 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/19279689 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/19279689 Branch: refs/heads/gearpump-runner Commit: 1927968955dfe834b3e48482a606ffa7fc403749 Parents: 3e84a5f Author: Mark LiuAuthored: Mon Nov 7 11:27:15 2016 -0800 Committer: Luke Cwik Committed: Mon Nov 7 13:45:43 2016 -0800 -- .../apache/beam/examples/WindowedWordCountIT.java | 16 +++- .../java/org/apache/beam/examples/WordCountIT.java | 17 +++-- .../examples/cookbook/BigQueryTornadoesIT.java | 16 +++- 3 files changed, 21 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19279689/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index d545ad2..c78fad6 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -17,7 +17,6 @@ */ package org.apache.beam.examples; -import com.google.common.base.Strings; import java.io.IOException; import org.apache.beam.examples.WindowedWordCount.Options; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -26,6 +25,7 @@ import org.apache.beam.sdk.testing.BigqueryMatcher; import org.apache.beam.sdk.testing.StreamingIT; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -44,8 +44,11 @@ public class WindowedWordCountIT { */ public interface WindowedWordCountITOptions extends Options, TestPipelineOptions, StreamingOptions { -String getChecksum(); -void setChecksum(String value); + } + + @BeforeClass + public static void setUp() { +PipelineOptionsFactory.register(TestPipelineOptions.class); } @Test @@ -60,20 +63,15 @@ public class WindowedWordCountIT { } private void testWindowedWordCountPipeline(boolean isStreaming) throws IOException { -PipelineOptionsFactory.register(WindowedWordCountITOptions.class); WindowedWordCountITOptions options = TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class); options.setStreaming(isStreaming); String query = String.format("SELECT word, SUM(count) FROM [%s:%s.%s] GROUP BY word", options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable()); -String outputChecksum = -Strings.isNullOrEmpty(options.getChecksum()) -? DEFAULT_OUTPUT_CHECKSUM -: options.getChecksum(); options.setOnSuccessMatcher( new BigqueryMatcher( -options.getAppName(), options.getProject(), query, outputChecksum)); +options.getAppName(), options.getProject(), query, DEFAULT_OUTPUT_CHECKSUM)); WindowedWordCount.main(TestPipeline.convertToArgs(options)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19279689/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java index 8f170af..487f04b 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java @@ -18,7 +18,6 @@ package org.apache.beam.examples; -import com.google.common.base.Strings; import java.util.Date; import org.apache.beam.examples.WordCount.WordCountOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -26,6 +25,7 @@ import org.apache.beam.sdk.testing.FileChecksumMatcher; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.util.IOChannelUtils; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -45,13 +45,15 @@ public class WordCountIT { * with customized input. */ public interface WordCountITOptions extends TestPipelineOptions, WordCountOptions { -String getChecksum(); -void
[06/50] incubator-beam git commit: This closes #1276
This closes #1276 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99062d10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99062d10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99062d10 Branch: refs/heads/gearpump-runner Commit: 99062d103c96e47ea78fc866984195df12de7770 Parents: 6e1e57b 5dca267 Author: Thomas GrohAuthored: Fri Nov 4 13:05:21 2016 -0700 Committer: Thomas Groh Committed: Fri Nov 4 13:05:21 2016 -0700 -- .../beam/runners/direct/EvaluationContext.java | 6 +- .../direct/ExecutorServiceParallelExecutor.java | 41 .../beam/runners/direct/WatermarkManager.java | 79 -- .../runners/direct/EvaluationContextTest.java | 23 ++--- .../runners/direct/WatermarkManagerTest.java| 102 ++- 5 files changed, 109 insertions(+), 142 deletions(-) --
[39/50] incubator-beam git commit: Allow BoundedWindow subclasses in DoFn parameter list
Allow BoundedWindow subclasses in DoFn parameter list Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c3e59fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c3e59fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c3e59fa Branch: refs/heads/gearpump-runner Commit: 9c3e59fab86e93477f14e0709ae8ecc37b84f3ef Parents: 85b908b Author: Kenneth KnowlesAuthored: Thu Nov 3 21:30:25 2016 -0700 Committer: Kenneth Knowles Committed: Mon Nov 7 15:25:03 2016 -0800 -- .../org/apache/beam/sdk/transforms/ParDo.java | 43 +- .../sdk/transforms/reflect/DoFnInvokers.java| 8 ++- .../sdk/transforms/reflect/DoFnSignature.java | 40 + .../sdk/transforms/reflect/DoFnSignatures.java | 41 +++-- .../beam/sdk/transforms/windowing/WindowFn.java | 12 .../apache/beam/sdk/transforms/ParDoTest.java | 61 .../transforms/reflect/DoFnInvokersTest.java| 6 +- 7 files changed, 190 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 0684a5c..26799c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -32,7 +32,10 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.MethodWithExtraParameters; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.StringUtils; @@ -41,6 +44,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypedPValue; /** @@ -548,6 +552,42 @@ public class ParDo { } /** + * Perform common validations of the {@link DoFn} against the input {@link PCollection}, for + * example ensuring that the window type expected by the {@link DoFn} matches the window type of + * the {@link PCollection}. + */ + private static void validateWindowType( + PCollection input, Serializable fn) { +// No validation for OldDoFn +if (!(fn instanceof DoFn)) { + return; +} + +DoFnSignature signature = DoFnSignatures.INSTANCE.getSignature((Class) fn.getClass()); + +TypeDescriptor actualWindowT = +input.getWindowingStrategy().getWindowFn().getWindowTypeDescriptor(); + +validateWindowTypeForMethod(actualWindowT, signature.processElement()); +for (OnTimerMethod method : signature.onTimerMethods().values()) { + validateWindowTypeForMethod(actualWindowT, method); +} + } + + private static void validateWindowTypeForMethod( + TypeDescriptor actualWindowT, + MethodWithExtraParameters methodSignature) { +if (methodSignature.windowT() != null) { + checkArgument( + methodSignature.windowT().isSupertypeOf(actualWindowT), + "%s expects window type %s, which is not a supertype of actual window type %s", + methodSignature.targetMethod(), + methodSignature.windowT(), + actualWindowT); +} + } + + /** * Perform common validations of the {@link DoFn}, for example ensuring that state is used * correctly and that its features can be supported. */ @@ -768,6 +808,7 @@ public class ParDo { public PCollection apply(PCollection input) { checkArgument( !isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner"); + validateWindowType(input, fn); return PCollection.createPrimitiveOutputInternal( input.getPipeline(), input.getWindowingStrategy(), @@ -1024,7 +1065,7 @@ public class ParDo { public PCollectionTuple apply(PCollection input) { checkArgument( !isSplittable(getOldFn()),
[35/50] incubator-beam git commit: Switch DoFnInvokersTest to use a mock ExtraContextFactory
Switch DoFnInvokersTest to use a mock ExtraContextFactory Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c052d2a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c052d2a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c052d2a1 Branch: refs/heads/gearpump-runner Commit: c052d2a1a1fd36f2c21e63427a9e4a50addf85f7 Parents: 40ff9d4 Author: Kenneth KnowlesAuthored: Thu Nov 3 19:26:36 2016 -0700 Committer: Kenneth Knowles Committed: Mon Nov 7 15:25:03 2016 -0800 -- .../transforms/reflect/DoFnInvokersTest.java| 35 1 file changed, 6 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c052d2a1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 0bfe2be..dbb7955 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.GetInitialRestriction; +import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; @@ -67,41 +68,17 @@ public class DoFnInvokersTest { @Mock private DoFn.InputProvider mockInputProvider; @Mock private DoFn.OutputReceiver mockOutputReceiver; @Mock private WindowingInternals mockWindowingInternals; + @Mock private ExtraContextFactory extraContextFactory; @Mock private OldDoFn mockOldDoFn; - private DoFn.ExtraContextFactory extraContextFactory; - @Before public void setUp() { MockitoAnnotations.initMocks(this); -this.extraContextFactory = -new DoFn.ExtraContextFactory () { - @Override - public BoundedWindow window() { -return mockWindow; - } - - @Override - public DoFn.InputProvider inputProvider() { -return mockInputProvider; - } - - @Override - public DoFn.OutputReceiver outputReceiver() { -return mockOutputReceiver; - } - - @Override - public WindowingInternals windowingInternals() { -return mockWindowingInternals; - } - - @Override - public RestrictionTracker restrictionTracker() { -return null; - } -}; +when(extraContextFactory.window()).thenReturn(mockWindow); +when(extraContextFactory.inputProvider()).thenReturn(mockInputProvider); +when(extraContextFactory.outputReceiver()).thenReturn(mockOutputReceiver); + when(extraContextFactory.windowingInternals()).thenReturn(mockWindowingInternals); } private ProcessContinuation invokeProcessElement(DoFn fn) {
[18/50] incubator-beam git commit: This closes #506
This closes #506 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f6a9733f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f6a9733f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f6a9733f Branch: refs/heads/gearpump-runner Commit: f6a9733f58f6af5248608b7c433fee5a01e3b8b8 Parents: 70255d2 bc28799 Author: Jesse AndersonAuthored: Mon Nov 7 09:40:32 2016 -0800 Committer: Jesse Anderson Committed: Mon Nov 7 09:40:32 2016 -0800 -- .../beam/sdk/transforms/RegexTransform.java | 505 +++ .../beam/sdk/transforms/RegexTransformTest.java | 262 ++ 2 files changed, 767 insertions(+) --
[30/50] incubator-beam git commit: Closes #1299
Closes #1299 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3e84a5f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3e84a5f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3e84a5f3 Branch: refs/heads/gearpump-runner Commit: 3e84a5f3c35a64de5e7be642ff3b7b4dcfd541b0 Parents: 9b47228 367fcac Author: Dan HalperinAuthored: Mon Nov 7 13:33:05 2016 -0800 Committer: Dan Halperin Committed: Mon Nov 7 13:33:05 2016 -0800 -- sdks/java/io/kinesis/pom.xml| 21 ++-- .../beam/sdk/io/kinesis/CustomOptional.java | 21 .../beam/sdk/io/kinesis/KinesisRecord.java | 3 +-- .../beam/sdk/io/kinesis/CustomOptionalTest.java | 10 ++ 4 files changed, 34 insertions(+), 21 deletions(-) --
[20/50] incubator-beam git commit: Format Regex according to style guidelines
Format Regex according to style guidelines Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/79b04551 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/79b04551 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/79b04551 Branch: refs/heads/gearpump-runner Commit: 79b04551c7c9f964908ab4a1d95119ef8a7fff84 Parents: 6954abe Author: Kenneth KnowlesAuthored: Mon Nov 7 10:10:59 2016 -0800 Committer: Kenneth Knowles Committed: Mon Nov 7 10:10:59 2016 -0800 -- .../org/apache/beam/sdk/transforms/Regex.java | 525 +-- .../apache/beam/sdk/transforms/RegexTest.java | 106 ++-- 2 files changed, 283 insertions(+), 348 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/79b04551/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java index 27104f6..a94130d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java @@ -24,22 +24,17 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; /** - * {@code PTransorm}s to use Regular Expressions to process elements in a - * {@link PCollection}. + * {@code PTransorm}s to use Regular Expressions to process elements in a {@link PCollection}. * - * - * {@link Regex#matches(String, int)} can be used to see if an entire line matches - * a Regex. {@link Regex#matchesKV(String, int, int)} can be used to see if an entire - * line matches a Regex and output certain groups as a {@link KV}. - * - * - * {@link Regex#find(String, int)} can be used to see if a portion of a line - * matches a Regex. {@link Regex#matchesKV(String, int, int)} can be used to see if a - * portion of a line matches a Regex and output certain groups as a {@link KV}. - * - * - * Lines that do not match the Regex will not be output. - * + * {@link Regex#matches(String, int)} can be used to see if an entire line matches a Regex. + * {@link Regex#matchesKV(String, int, int)} can be used to see if an entire line matches a Regex + * and output certain groups as a {@link KV}. + * + * {@link Regex#find(String, int)} can be used to see if a portion of a line matches a Regex. + * {@link Regex#matchesKV(String, int, int)} can be used to see if a portion of a line matches a + * Regex and output certain groups as a {@link KV}. + * + * Lines that do not match the Regex will not be output. */ public class Regex { private Regex() { @@ -47,159 +42,135 @@ public class Regex { } /** - * Returns a {@link Regex.Matches} {@link PTransform} that checks if - * the entire line matches the Regex. Returns the entire line (group 0) as a - * {@link PCollection}. - * @param regex - * The regular expression to run + * Returns a {@link Regex.Matches} {@link PTransform} that checks if the entire line matches the + * Regex. Returns the entire line (group 0) as a {@link PCollection}. + * + * @param regex The regular expression to run */ public static Matches matches(String regex) { return matches(regex, 0); } /** - * Returns a {@link Regex.Matches} {@link PTransform} that checks if - * the entire line matches the Regex. Returns the group as a - * {@link PCollection}. - * @param regex - * The regular expression to run - * @param group - * The Regex group to return as a PCollection + * Returns a {@link Regex.Matches} {@link PTransform} that checks if the entire line matches the + * Regex. Returns the group as a {@link PCollection}. + * + * @param regex The regular expression to run + * @param group The Regex group to return as a PCollection */ public static Matches matches(String regex, int group) { return new Matches(regex, group); } /** - * Returns a {@link Regex.MatchesKV} {@link PTransform} that checks - * if the entire line matches the Regex. Returns the specified groups as the - * key and value as a {@link PCollection}. - * @param regex - * The regular expression to run - * @param keyGroup - * The Regex group to use as the key - * @param valueGroup - * The Regex group to use the value + * Returns a {@link Regex.MatchesKV} {@link PTransform} that checks if the entire line matches the + * Regex. Returns the specified groups as the key and value as a {@link PCollection}. + * + * @param regex The regular expression to run + * @param keyGroup The
[33/50] incubator-beam git commit: Update Watermarks even if a Reader is empty
Update Watermarks even if a Reader is empty This ensures that the pipeline will make progress even if a reader stops producing elements. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ff7fe07b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ff7fe07b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ff7fe07b Branch: refs/heads/gearpump-runner Commit: ff7fe07be96de393b763e7b3d213734040aa3795 Parents: 912500f Author: Thomas GrohAuthored: Mon Nov 7 12:59:06 2016 -0800 Committer: Thomas Groh Committed: Mon Nov 7 15:08:43 2016 -0800 -- .../direct/UnboundedReadEvaluatorFactory.java| 6 -- .../UnboundedReadEvaluatorFactoryTest.java | 19 +-- 2 files changed, 17 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ff7fe07b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index e529088..fb09b3e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -143,12 +144,13 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { // If the reader had no elements available, but the shard is not done, reuse it later resultBuilder.addUnprocessedElements( Collections. singleton( - element.withValue( + WindowedValue.timestampedValueInGlobalWindow( UnboundedSourceShard.of( shard.getSource(), shard.getDeduplicator(), reader, - shard.getCheckpoint(); + shard.getCheckpoint()), + reader.getWatermark(; } } catch (IOException e) { if (reader != null) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ff7fe07b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java -- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 9a7fec3..18c7cec 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -260,6 +260,7 @@ public class UnboundedReadEvaluatorFactoryTest { (WindowedValue >) Iterables.getOnlyElement(result.getUnprocessedElements()); secondEvaluator.processElement(residual); + TransformResult secondResult = secondEvaluator.finishBundle(); // Sanity check that nothing was output (The test would have to run for more than a day to do @@ -268,11 +269,14 @@ public class UnboundedReadEvaluatorFactoryTest { secondOutput.commit(Instant.now()).getElements(), Matchers. emptyIterable()); -// Test that even though the reader produced no outputs, there is still a residual shard. -UnboundedSourceShard residualShard = -(UnboundedSourceShard ) - Iterables.getOnlyElement(secondResult.getUnprocessedElements()).getValue(); -assertThat(residualShard.getExistingReader(), not(nullValue())); +// Test that even though the reader produced no outputs, there is still a residual shard with +// the updated watermark. +WindowedValue > unprocessed = +
[47/50] incubator-beam git commit: This closes #1303
This closes #1303 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99505e12 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99505e12 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99505e12 Branch: refs/heads/gearpump-runner Commit: 99505e1256082824aebab3da26128a1e52fd7c17 Parents: 339dee9 e2856fb Author: Thomas GrohAuthored: Mon Nov 7 18:04:51 2016 -0800 Committer: Thomas Groh Committed: Mon Nov 7 18:04:51 2016 -0800 -- .../direct/UnboundedReadEvaluatorFactory.java| 6 ++ .../UnboundedReadEvaluatorFactoryTest.java | 19 ++- 2 files changed, 8 insertions(+), 17 deletions(-) --
[43/50] incubator-beam git commit: This closes #1287
This closes #1287 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c6d9bf29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c6d9bf29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c6d9bf29 Branch: refs/heads/gearpump-runner Commit: c6d9bf29700d6f13a33423183201a18525040e05 Parents: 9de9ce6 a58f1eb Author: Thomas GrohAuthored: Mon Nov 7 15:47:02 2016 -0800 Committer: Thomas Groh Committed: Mon Nov 7 15:47:02 2016 -0800 -- .../beam/runners/direct/DirectRunner.java | 5 + .../direct/ImmutableListBundleFactory.java | 21 ++- .../beam/runners/direct/WatermarkManager.java | 153 +-- .../direct/ImmutableListBundleFactoryTest.java | 15 +- 4 files changed, 141 insertions(+), 53 deletions(-) --
[41/50] incubator-beam git commit: Incrementally update Pending elements when work completes
Incrementally update Pending elements when work completes This reduces the amount of single-threaded updates the monitor thread performs before firing timers. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/317b5e65 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/317b5e65 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/317b5e65 Branch: refs/heads/gearpump-runner Commit: 317b5e6577a623fa8fddeac90e6a3c9510a250e5 Parents: 9de9ce6 Author: Thomas GrohAuthored: Fri Nov 4 11:28:03 2016 -0700 Committer: Thomas Groh Committed: Mon Nov 7 15:47:02 2016 -0800 -- .../beam/runners/direct/WatermarkManager.java | 109 ++- 1 file changed, 83 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/317b5e65/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index f01c13c..2228cd5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -43,7 +43,10 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -682,10 +685,16 @@ public class WatermarkManager { private final ConcurrentLinkedQueue pendingUpdates; /** + * A lock used to control concurrency for updating pending values. + */ + private final Lock refreshLock; + + /** * A queue of pending {@link AppliedPTransform AppliedPTransforms} that have potentially * stale data. */ - private final ConcurrentLinkedQueue pendingRefreshes; + @GuardedBy("refreshLock") + private final Set pendingRefreshes; /** * Creates a new {@link WatermarkManager}. All watermarks within the newly created @@ -710,7 +719,9 @@ public class WatermarkManager { this.clock = clock; this.consumers = consumers; this.pendingUpdates = new ConcurrentLinkedQueue<>(); -this.pendingRefreshes = new ConcurrentLinkedQueue<>(); + +this.refreshLock = new ReentrantLock(); +this.pendingRefreshes = new HashSet<>(); transformToWatermarks = new HashMap<>(); @@ -795,13 +806,18 @@ public class WatermarkManager { public void initialize( Map > initialBundles) { -for (Map.Entry > rootEntry : -initialBundles.entrySet()) { - TransformWatermarks rootWms = transformToWatermarks.get(rootEntry.getKey()); - for (CommittedBundle initialBundle : rootEntry.getValue()) { -rootWms.addPending(initialBundle); +refreshLock.lock(); +try { + for (Map.Entry > rootEntry : + initialBundles.entrySet()) { +TransformWatermarks rootWms = transformToWatermarks.get(rootEntry.getKey()); +for (CommittedBundle initialBundle : rootEntry.getValue()) { + rootWms.addPending(initialBundle); +} +pendingRefreshes.add(rootEntry.getKey()); } - pendingRefreshes.offer(rootEntry.getKey()); +} finally { + refreshLock.unlock(); } } @@ -834,6 +850,17 @@ public class WatermarkManager { timerUpdate, result, earliestHold)); +tryApplyPendingUpdates(); + } + + private void tryApplyPendingUpdates() { +if (refreshLock.tryLock()) { + try { +applyNUpdates(10); + } finally { +refreshLock.unlock(); + } +} } /** @@ -841,14 +868,24 @@ public class WatermarkManager { * of all {@link TransformWatermarks} to be advanced as far as possible. */ private void applyPendingUpdates() { -Set updatedTransforms = new HashSet<>(); -PendingWatermarkUpdate pending = pendingUpdates.poll(); -while (pending != null) { +refreshLock.lock(); +try { + applyNUpdates(-1); +} finally { + refreshLock.unlock(); +} + } + +
[11/50] incubator-beam git commit: This closes #1272
This closes #1272 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/14e093a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/14e093a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/14e093a0 Branch: refs/heads/gearpump-runner Commit: 14e093a0a574c8c3920a83c38e411a06b29bf44b Parents: 46fbfe0 90a75d1 Author: SelaAuthored: Sat Nov 5 00:02:22 2016 +0200 Committer: Sela Committed: Sat Nov 5 00:02:22 2016 +0200 -- .../runners/spark/SparkPipelineOptions.java | 3 +-- .../SparkRunnerStreamingContextFactory.java | 23 +--- .../streaming/EmptyStreamAssertionTest.java | 3 +-- .../streaming/FlattenStreamingTest.java | 6 ++--- .../streaming/KafkaStreamingTest.java | 6 ++--- .../ResumeFromCheckpointStreamingTest.java | 3 +-- .../streaming/SimpleStreamingWordCountTest.java | 3 +-- .../utils/TestOptionsForStreaming.java | 12 +- 8 files changed, 19 insertions(+), 40 deletions(-) --
[01/50] incubator-beam git commit: This closes #1252
Repository: incubator-beam Updated Branches: refs/heads/gearpump-runner 323ec1188 -> a14927f40 This closes #1252 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/faf55c78 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/faf55c78 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/faf55c78 Branch: refs/heads/gearpump-runner Commit: faf55c78a30a6937a8b598e674a48f446fcb5424 Parents: 3419fef 8bf6d92 Author: Thomas GrohAuthored: Fri Nov 4 09:57:44 2016 -0700 Committer: Thomas Groh Committed: Fri Nov 4 09:57:44 2016 -0700 -- .../sdk/transforms/reflect/DoFnInvokers.java| 7 +- .../sdk/transforms/reflect/DoFnSignature.java | 44 +- .../sdk/transforms/reflect/DoFnSignatures.java | 752 --- .../apache/beam/sdk/values/TypeDescriptor.java | 13 + .../DoFnSignaturesProcessElementTest.java | 18 +- .../DoFnSignaturesSplittableDoFnTest.java | 19 +- .../transforms/reflect/DoFnSignaturesTest.java | 42 +- .../reflect/DoFnSignaturesTestUtils.java| 13 +- 8 files changed, 578 insertions(+), 330 deletions(-) --
[09/50] incubator-beam git commit: [BEAM-898] Fix Jenkins BigQueryTornadoes IT Failure
[BEAM-898] Fix Jenkins BigQueryTornadoes IT Failure This closes #1279 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46fbfe06 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46fbfe06 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46fbfe06 Branch: refs/heads/gearpump-runner Commit: 46fbfe06bc34f9c355d00f346f63767a861cb858 Parents: 99062d1 1eccd29 Author: Luke CwikAuthored: Fri Nov 4 14:20:14 2016 -0700 Committer: Luke Cwik Committed: Fri Nov 4 14:20:14 2016 -0700 -- .../beam/examples/WindowedWordCountIT.java | 11 --- .../org/apache/beam/examples/WordCountIT.java| 19 +++ .../examples/cookbook/BigQueryTornadoesIT.java | 11 --- 3 files changed, 27 insertions(+), 14 deletions(-) --
[03/50] incubator-beam git commit: Closes #1285
Closes #1285 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f06deac8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f06deac8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f06deac8 Branch: refs/heads/gearpump-runner Commit: f06deac8c10f10b4b64ddc51358b1b551845e595 Parents: faf55c7 f50b214 Author: Dan HalperinAuthored: Fri Nov 4 10:51:29 2016 -0700 Committer: Dan Halperin Committed: Fri Nov 4 10:51:29 2016 -0700 -- examples/java/pom.xml | 5 - examples/java8/pom.xml| 5 - examples/pom.xml | 13 - pom.xml | 3 +++ runners/core-java/pom.xml | 5 - runners/direct-java/pom.xml | 5 - runners/flink/examples/pom.xml| 5 - runners/flink/runner/pom.xml | 5 - runners/google-cloud-dataflow-java/pom.xml| 7 --- runners/pom.xml | 11 +++ runners/spark/pom.xml | 4 sdks/java/build-tools/pom.xml | 4 ++-- .../src/main/resources/beam/checkstyle.xml| 2 +- sdks/java/core/pom.xml| 18 +- sdks/java/extensions/join-library/pom.xml | 4 sdks/java/extensions/sorter/pom.xml | 4 sdks/java/io/google-cloud-platform/pom.xml| 4 sdks/java/io/hdfs/pom.xml | 4 sdks/java/io/jdbc/pom.xml | 6 +- sdks/java/io/jms/pom.xml | 4 sdks/java/io/kafka/pom.xml| 4 sdks/java/io/kinesis/pom.xml | 4 sdks/java/io/mongodb/pom.xml | 4 sdks/java/java8tests/pom.xml | 5 - sdks/java/microbenchmarks/pom.xml | 5 - sdks/java/pom.xml | 4 +++- sdks/pom.xml | 16 +++- 27 files changed, 57 insertions(+), 103 deletions(-) --
[27/50] incubator-beam git commit: Sorter: fix README to support syntax highlighting
Sorter: fix README to support syntax highlighting Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0604d2bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0604d2bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0604d2bb Branch: refs/heads/gearpump-runner Commit: 0604d2bb0a666787545b75b1a70ca2fe6496cddc Parents: baa7fb0 Author: Dan HalperinAuthored: Mon Nov 7 12:18:11 2016 -0800 Committer: Dan Halperin Committed: Mon Nov 7 13:31:47 2016 -0800 -- sdks/java/extensions/sorter/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0604d2bb/sdks/java/extensions/sorter/README.md -- diff --git a/sdks/java/extensions/sorter/README.md b/sdks/java/extensions/sorter/README.md index 80d2a40..18bd0d2 100644 --- a/sdks/java/extensions/sorter/README.md +++ b/sdks/java/extensions/sorter/README.md @@ -28,7 +28,7 @@ This module provides the SortValues transform, which takes a `PCollection >> input = ... // Group by primary key, bringing pairs for the same key together. @@ -39,4 +39,4 @@ PCollection create(new BufferedExternalSorter.Options())); - \ No newline at end of file +```
[26/50] incubator-beam git commit: [BEAM-725] Migrate to use the generic Google credentials library
[BEAM-725] Migrate to use the generic Google credentials library This closes #1294 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/baa7fb03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/baa7fb03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/baa7fb03 Branch: refs/heads/gearpump-runner Commit: baa7fb0317afa4a18462ed66ab0a7ba1e8eb2c89 Parents: b5f8473 bb260ec Author: Luke CwikAuthored: Mon Nov 7 13:09:19 2016 -0800 Committer: Luke Cwik Committed: Mon Nov 7 13:09:19 2016 -0800 -- pom.xml | 21 ++ runners/google-cloud-dataflow-java/pom.xml | 15 +- .../dataflow/util/DataflowTransport.java| 9 +- sdks/java/core/pom.xml | 19 +- .../org/apache/beam/sdk/options/GcpOptions.java | 136 ++--- .../beam/sdk/testing/BigqueryMatcher.java | 15 +- .../apache/beam/sdk/util/CredentialFactory.java | 4 +- .../org/apache/beam/sdk/util/Credentials.java | 192 --- .../beam/sdk/util/GcpCredentialFactory.java | 41 +++- .../beam/sdk/util/NoopCredentialFactory.java| 9 +- .../apache/beam/sdk/util/PubsubGrpcClient.java | 18 +- .../apache/beam/sdk/util/PubsubJsonClient.java | 9 +- .../apache/beam/sdk/util/TestCredential.java| 44 +++-- .../org/apache/beam/sdk/util/Transport.java | 9 +- .../beam/sdk/util/PubsubGrpcClientTest.java | 9 +- sdks/java/io/google-cloud-platform/pom.xml | 9 +- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 25 ++- .../beam/sdk/io/gcp/datastore/V1TestUtil.java | 25 ++- 18 files changed, 186 insertions(+), 423 deletions(-) --
[2/2] incubator-beam git commit: Add OnTimerInvoker(s), for invoking DoFn @OnTimer methods
Add OnTimerInvoker(s), for invoking DoFn @OnTimer methods OnTimerInvoker encapsulates the dispatch from onTimer() to a call to the DoFn method annotated with @OnTimer(). Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42f5251d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42f5251d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42f5251d Branch: refs/heads/master Commit: 42f5251dab3874471fc6a4c5ad813932b65ff703 Parents: 99505e1 Author: Kenneth KnowlesAuthored: Mon Oct 31 19:26:38 2016 -0700 Committer: Kenneth Knowles Committed: Mon Nov 7 18:30:18 2016 -0800 -- .../sdk/transforms/reflect/DoFnInvokers.java| 4 +- .../sdk/transforms/reflect/OnTimerInvoker.java | 27 ++ .../sdk/transforms/reflect/OnTimerInvokers.java | 271 +++ .../transforms/reflect/OnTimerInvokersTest.java | 109 4 files changed, 409 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42f5251d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index b975711..086ae7f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -377,7 +377,7 @@ public class DoFnInvokers { * Implements a method of {@link DoFnInvoker} (the "instrumented method") by delegating to a * "target method" of the wrapped {@link DoFn}. */ - private static class DoFnMethodDelegation implements Implementation { + static class DoFnMethodDelegation implements Implementation { /** The {@link MethodDescription} of the wrapped {@link DoFn}'s method. */ protected final MethodDescription targetMethod; /** Whether the target method returns non-void. */ @@ -529,7 +529,7 @@ public class DoFnInvokers { MethodInvocation.invoke(getExtraContextFactoryMethodDescription(methodName))); } - private static StackManipulation getExtraContextParameter( + static StackManipulation getExtraContextParameter( DoFnSignature.Parameter parameter, final StackManipulation pushExtraContextFactory) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42f5251d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java new file mode 100644 index 000..de9d667 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvoker.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import org.apache.beam.sdk.transforms.DoFn; + +/** Interface for invoking the {@link DoFn.OnTimer} method for a particular timer. */ +interface OnTimerInvoker { + + /** Invoke the {@link DoFn.OnTimer} method in the provided context. */ + void invokeOnTimer(DoFn.ExtraContextFactory extra); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42f5251d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java new file mode 100644 index 000..b2bace2 --- /dev/null +++
[1/2] incubator-beam git commit: This closes #1256
Repository: incubator-beam Updated Branches: refs/heads/master 99505e125 -> afa0c31bd This closes #1256 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/afa0c31b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/afa0c31b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/afa0c31b Branch: refs/heads/master Commit: afa0c31bd0f9e12e78858a1971e7870a2c14b01a Parents: 99505e1 42f5251 Author: Kenneth KnowlesAuthored: Mon Nov 7 18:30:18 2016 -0800 Committer: Kenneth Knowles Committed: Mon Nov 7 18:30:18 2016 -0800 -- .../sdk/transforms/reflect/DoFnInvokers.java| 4 +- .../sdk/transforms/reflect/OnTimerInvoker.java | 27 ++ .../sdk/transforms/reflect/OnTimerInvokers.java | 271 +++ .../transforms/reflect/OnTimerInvokersTest.java | 109 4 files changed, 409 insertions(+), 2 deletions(-) --
[5/6] incubator-beam git commit: Add DoFnInvoker dispatch for State and Timer parameters
Add DoFnInvoker dispatch for State and Timer parameters Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2db8268 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2db8268 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2db8268 Branch: refs/heads/master Commit: e2db82686008aea224ca5cf1ef1acc2831c46ceb Parents: c052d2a Author: Kenneth KnowlesAuthored: Thu Nov 3 19:18:24 2016 -0700 Committer: Kenneth Knowles Committed: Mon Nov 7 15:25:03 2016 -0800 -- .../beam/runners/core/SimpleDoFnRunner.java | 12 +++ .../beam/runners/core/SplittableParDo.java | 12 +++ .../org/apache/beam/sdk/transforms/DoFn.java| 20 .../beam/sdk/transforms/DoFnAdapters.java | 22 .../sdk/transforms/reflect/DoFnInvokers.java| 104 +++ .../transforms/reflect/DoFnInvokersTest.java| 59 ++- 6 files changed, 187 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index dec9905..3abb06b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -48,11 +48,13 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.ExecutionContext.StepContext; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -532,6 +534,16 @@ public class SimpleDoFnRunner implements DoFnRunner windowingInternals() { return new WindowingInternals () { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 33d0ab7..d8ee1d5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -46,9 +46,11 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; @@ -432,6 +434,16 @@ public class SplittableParDo< public TrackerT restrictionTracker() { return tracker; } + + @Override + public State state(String stateId) { +throw new UnsupportedOperationException("State cannot be used with a splittable DoFn"); + } + + @Override + public Timer timer(String timerId) { +throw new UnsupportedOperationException("Timers cannot be used with a splittable DoFn"); + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2db8268/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
[6/6] incubator-beam git commit: This closes #1282
This closes #1282 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9de9ce69 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9de9ce69 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9de9ce69 Branch: refs/heads/master Commit: 9de9ce69fa010f46911ac0f9cfbe4df2e475772b Parents: ac252a7 9c3e59f Author: Kenneth KnowlesAuthored: Mon Nov 7 15:25:04 2016 -0800 Committer: Kenneth Knowles Committed: Mon Nov 7 15:25:04 2016 -0800 -- .../beam/runners/core/SimpleDoFnRunner.java | 12 ++ .../beam/runners/core/SplittableParDo.java | 12 ++ .../org/apache/beam/sdk/transforms/DoFn.java| 20 +++ .../beam/sdk/transforms/DoFnAdapters.java | 22 +++ .../org/apache/beam/sdk/transforms/ParDo.java | 43 +- .../sdk/transforms/reflect/DoFnInvokers.java| 143 +-- .../sdk/transforms/reflect/DoFnSignature.java | 67 ++--- .../sdk/transforms/reflect/DoFnSignatures.java | 56 +++- .../beam/sdk/transforms/windowing/WindowFn.java | 12 ++ .../apache/beam/sdk/transforms/ParDoTest.java | 61 .../transforms/reflect/DoFnInvokersTest.java| 100 - .../DoFnSignaturesSplittableDoFnTest.java | 5 +- .../transforms/reflect/DoFnSignaturesTest.java | 3 +- 13 files changed, 445 insertions(+), 111 deletions(-) --
[4/6] incubator-beam git commit: Switch DoFnInvokersTest to use a mock ExtraContextFactory
Switch DoFnInvokersTest to use a mock ExtraContextFactory Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c052d2a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c052d2a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c052d2a1 Branch: refs/heads/master Commit: c052d2a1a1fd36f2c21e63427a9e4a50addf85f7 Parents: 40ff9d4 Author: Kenneth KnowlesAuthored: Thu Nov 3 19:26:36 2016 -0700 Committer: Kenneth Knowles Committed: Mon Nov 7 15:25:03 2016 -0800 -- .../transforms/reflect/DoFnInvokersTest.java| 35 1 file changed, 6 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c052d2a1/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 0bfe2be..dbb7955 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.GetInitialRestriction; +import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; @@ -67,41 +68,17 @@ public class DoFnInvokersTest { @Mock private DoFn.InputProvider mockInputProvider; @Mock private DoFn.OutputReceiver mockOutputReceiver; @Mock private WindowingInternals mockWindowingInternals; + @Mock private ExtraContextFactory extraContextFactory; @Mock private OldDoFn mockOldDoFn; - private DoFn.ExtraContextFactory extraContextFactory; - @Before public void setUp() { MockitoAnnotations.initMocks(this); -this.extraContextFactory = -new DoFn.ExtraContextFactory () { - @Override - public BoundedWindow window() { -return mockWindow; - } - - @Override - public DoFn.InputProvider inputProvider() { -return mockInputProvider; - } - - @Override - public DoFn.OutputReceiver outputReceiver() { -return mockOutputReceiver; - } - - @Override - public WindowingInternals windowingInternals() { -return mockWindowingInternals; - } - - @Override - public RestrictionTracker restrictionTracker() { -return null; - } -}; +when(extraContextFactory.window()).thenReturn(mockWindow); +when(extraContextFactory.inputProvider()).thenReturn(mockInputProvider); +when(extraContextFactory.outputReceiver()).thenReturn(mockOutputReceiver); + when(extraContextFactory.windowingInternals()).thenReturn(mockWindowingInternals); } private ProcessContinuation invokeProcessElement(DoFn fn) {
[3/6] incubator-beam git commit: Generalize extraction of DoFn parameters from context
Generalize extraction of DoFn parameters from context Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/40ff9d40 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/40ff9d40 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/40ff9d40 Branch: refs/heads/master Commit: 40ff9d401f0ba3f85d1bab848d1c6a662b03bc99 Parents: ac252a7 Author: Kenneth KnowlesAuthored: Thu Nov 3 18:42:25 2016 -0700 Committer: Kenneth Knowles Committed: Mon Nov 7 15:25:03 2016 -0800 -- .../sdk/transforms/reflect/DoFnInvokers.java| 115 --- .../sdk/transforms/reflect/DoFnSignature.java | 11 +- .../sdk/transforms/reflect/DoFnSignatures.java | 15 ++- .../DoFnSignaturesSplittableDoFnTest.java | 5 +- 4 files changed, 94 insertions(+), 52 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/40ff9d40/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index c5a23dc..ad2b766 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -69,6 +69,13 @@ import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BoundedWindowParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.TypeDescriptor; @@ -496,40 +503,76 @@ public class DoFnInvokers { } } + private static StackManipulation simpleExtraContextParameter( +String methodName, +StackManipulation pushExtraContextFactory) { +try { + return new StackManipulation.Compound( +pushExtraContextFactory, +MethodInvocation.invoke( +new MethodDescription.ForLoadedMethod( +DoFn.ExtraContextFactory.class.getMethod(methodName; +} catch (Exception e) { + throw new IllegalStateException( + String.format( + "Failed to locate required method %s.%s", + ExtraContextFactory.class.getSimpleName(), methodName), + e); +} + } + + private static StackManipulation getExtraContextParameter( + DoFnSignature.Parameter parameter, + final StackManipulation pushExtraContextFactory) { + +return parameter.match(new Cases() { + + @Override + public StackManipulation dispatch(BoundedWindowParameter p) { +return simpleExtraContextParameter("window", pushExtraContextFactory); + } + + @Override + public StackManipulation dispatch(InputProviderParameter p) { +return simpleExtraContextParameter("inputProvider", pushExtraContextFactory); + } + + @Override + public StackManipulation dispatch(OutputReceiverParameter p) { +return simpleExtraContextParameter("outputReceiver", pushExtraContextFactory); + } + + @Override + public StackManipulation dispatch(RestrictionTrackerParameter p) { +// ExtraContextFactory.restrictionTracker() returns a RestrictionTracker, +// but the @ProcessElement method expects a concrete subtype of it. +// Insert a downcast. +return new StackManipulation.Compound( +simpleExtraContextParameter("restrictionTracker", pushExtraContextFactory), +TypeCasting.to(new TypeDescription.ForLoadedType(p.trackerT().getRawType(; + } + + @Override + public StackManipulation dispatch(StateParameter p) { +throw new UnsupportedOperationException("State parameters are not yet supported."); + } + + @Override + public StackManipulation dispatch(TimerParameter
[2/6] incubator-beam git commit: Rename BoundedWindowParameter -> WindowParameter
Rename BoundedWindowParameter -> WindowParameter Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/85b908be Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/85b908be Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/85b908be Branch: refs/heads/master Commit: 85b908be5b5858ff9dfb5d7f04c0e3ca28dbaa05 Parents: e2db826 Author: Kenneth KnowlesAuthored: Thu Nov 3 21:27:24 2016 -0700 Committer: Kenneth Knowles Committed: Mon Nov 7 15:25:03 2016 -0800 -- .../sdk/transforms/reflect/DoFnInvokers.java| 4 ++-- .../sdk/transforms/reflect/DoFnSignature.java | 24 ++-- .../transforms/reflect/DoFnSignaturesTest.java | 3 ++- 3 files changed, 16 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/85b908be/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index b7f75ed..ba95f98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -68,13 +68,13 @@ import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BoundedWindowParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.InputProviderParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.UserCodeException; @@ -537,7 +537,7 @@ public class DoFnInvokers { new Cases() { @Override - public StackManipulation dispatch(BoundedWindowParameter p) { + public StackManipulation dispatch(WindowParameter p) { return simpleExtraContextParameter("window", pushExtraContextFactory); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/85b908be/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index a189bd5..befc10b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -33,9 +33,9 @@ import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BoundedWindowParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Timer; @@ -159,8 +159,8 @@ public abstract class DoFnSignature { public ResultT match(Cases cases) { // This could be done with reflection, but since the number of cases is small and known, // they are simply inlined. - if (this instanceof BoundedWindowParameter) { -return cases.dispatch((BoundedWindowParameter) this); + if (this instanceof WindowParameter) { +return cases.dispatch((WindowParameter) this);
[1/6] incubator-beam git commit: Allow BoundedWindow subclasses in DoFn parameter list
Repository: incubator-beam Updated Branches: refs/heads/master ac252a7e1 -> 9de9ce69f Allow BoundedWindow subclasses in DoFn parameter list Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c3e59fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c3e59fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c3e59fa Branch: refs/heads/master Commit: 9c3e59fab86e93477f14e0709ae8ecc37b84f3ef Parents: 85b908b Author: Kenneth KnowlesAuthored: Thu Nov 3 21:30:25 2016 -0700 Committer: Kenneth Knowles Committed: Mon Nov 7 15:25:03 2016 -0800 -- .../org/apache/beam/sdk/transforms/ParDo.java | 43 +- .../sdk/transforms/reflect/DoFnInvokers.java| 8 ++- .../sdk/transforms/reflect/DoFnSignature.java | 40 + .../sdk/transforms/reflect/DoFnSignatures.java | 41 +++-- .../beam/sdk/transforms/windowing/WindowFn.java | 12 .../apache/beam/sdk/transforms/ParDoTest.java | 61 .../transforms/reflect/DoFnInvokersTest.java| 6 +- 7 files changed, 190 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c3e59fa/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 0684a5c..26799c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -32,7 +32,10 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.MethodWithExtraParameters; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.StringUtils; @@ -41,6 +44,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypedPValue; /** @@ -548,6 +552,42 @@ public class ParDo { } /** + * Perform common validations of the {@link DoFn} against the input {@link PCollection}, for + * example ensuring that the window type expected by the {@link DoFn} matches the window type of + * the {@link PCollection}. + */ + private static void validateWindowType( + PCollection input, Serializable fn) { +// No validation for OldDoFn +if (!(fn instanceof DoFn)) { + return; +} + +DoFnSignature signature = DoFnSignatures.INSTANCE.getSignature((Class) fn.getClass()); + +TypeDescriptor actualWindowT = +input.getWindowingStrategy().getWindowFn().getWindowTypeDescriptor(); + +validateWindowTypeForMethod(actualWindowT, signature.processElement()); +for (OnTimerMethod method : signature.onTimerMethods().values()) { + validateWindowTypeForMethod(actualWindowT, method); +} + } + + private static void validateWindowTypeForMethod( + TypeDescriptor actualWindowT, + MethodWithExtraParameters methodSignature) { +if (methodSignature.windowT() != null) { + checkArgument( + methodSignature.windowT().isSupertypeOf(actualWindowT), + "%s expects window type %s, which is not a supertype of actual window type %s", + methodSignature.targetMethod(), + methodSignature.windowT(), + actualWindowT); +} + } + + /** * Perform common validations of the {@link DoFn}, for example ensuring that state is used * correctly and that its features can be supported. */ @@ -768,6 +808,7 @@ public class ParDo { public PCollection apply(PCollection input) { checkArgument( !isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner"); + validateWindowType(input, fn); return PCollection.createPrimitiveOutputInternal( input.getPipeline(), input.getWindowingStrategy(), @@ -1024,7 +1065,7 @@ public class ParDo { public PCollectionTuple
[1/3] incubator-beam git commit: Rename RegexTransform to just Regex
Repository: incubator-beam Updated Branches: refs/heads/master f6a9733f5 -> 110245526 Rename RegexTransform to just Regex Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6954abe4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6954abe4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6954abe4 Branch: refs/heads/master Commit: 6954abe4e28103f5ddd3e1eebe998b765cd9de11 Parents: f6a9733 Author: Kenneth KnowlesAuthored: Mon Nov 7 10:10:32 2016 -0800 Committer: Kenneth Knowles Committed: Mon Nov 7 10:10:32 2016 -0800 -- .../org/apache/beam/sdk/transforms/Regex.java | 505 +++ .../beam/sdk/transforms/RegexTransform.java | 505 --- .../apache/beam/sdk/transforms/RegexTest.java | 262 ++ .../beam/sdk/transforms/RegexTransformTest.java | 262 -- 4 files changed, 767 insertions(+), 767 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6954abe4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java new file mode 100644 index 000..27104f6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@code PTransorm}s to use Regular Expressions to process elements in a + * {@link PCollection}. + * + * + * {@link Regex#matches(String, int)} can be used to see if an entire line matches + * a Regex. {@link Regex#matchesKV(String, int, int)} can be used to see if an entire + * line matches a Regex and output certain groups as a {@link KV}. + * + * + * {@link Regex#find(String, int)} can be used to see if a portion of a line + * matches a Regex. {@link Regex#matchesKV(String, int, int)} can be used to see if a + * portion of a line matches a Regex and output certain groups as a {@link KV}. + * + * + * Lines that do not match the Regex will not be output. + * + */ +public class Regex { + private Regex() { +// do not instantiate + } + + /** + * Returns a {@link Regex.Matches} {@link PTransform} that checks if + * the entire line matches the Regex. Returns the entire line (group 0) as a + * {@link PCollection}. + * @param regex + * The regular expression to run + */ + public static Matches matches(String regex) { +return matches(regex, 0); + } + + /** + * Returns a {@link Regex.Matches} {@link PTransform} that checks if + * the entire line matches the Regex. Returns the group as a + * {@link PCollection}. + * @param regex + * The regular expression to run + * @param group + * The Regex group to return as a PCollection + */ + public static Matches matches(String regex, int group) { +return new Matches(regex, group); + } + + /** + * Returns a {@link Regex.MatchesKV} {@link PTransform} that checks + * if the entire line matches the Regex. Returns the specified groups as the + * key and value as a {@link PCollection}. + * @param regex + * The regular expression to run + * @param keyGroup + * The Regex group to use as the key + * @param valueGroup + * The Regex group to use the value + */ + public static MatchesKV matchesKV(String regex, int keyGroup, + int valueGroup) { +return new MatchesKV(regex, keyGroup, valueGroup); + } + + /** + * Returns a {@link Regex.Find} {@link PTransform} that checks if a + * portion of the line matches the Regex. Returns the entire line (group 0) as + * a {@link
[3/3] incubator-beam git commit: This closes #1296
This closes #1296 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/11024552 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/11024552 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/11024552 Branch: refs/heads/master Commit: 110245526d204287ffb30d9b8e91aca4003542f6 Parents: f6a9733 79b0455 Author: Kenneth KnowlesAuthored: Mon Nov 7 10:28:21 2016 -0800 Committer: Kenneth Knowles Committed: Mon Nov 7 10:28:21 2016 -0800 -- .../org/apache/beam/sdk/transforms/Regex.java | 454 + .../beam/sdk/transforms/RegexTransform.java | 505 --- .../apache/beam/sdk/transforms/RegexTest.java | 248 + .../beam/sdk/transforms/RegexTransformTest.java | 262 -- 4 files changed, 702 insertions(+), 767 deletions(-) --
[2/3] incubator-beam git commit: Format Regex according to style guidelines
Format Regex according to style guidelines Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/79b04551 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/79b04551 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/79b04551 Branch: refs/heads/master Commit: 79b04551c7c9f964908ab4a1d95119ef8a7fff84 Parents: 6954abe Author: Kenneth KnowlesAuthored: Mon Nov 7 10:10:59 2016 -0800 Committer: Kenneth Knowles Committed: Mon Nov 7 10:10:59 2016 -0800 -- .../org/apache/beam/sdk/transforms/Regex.java | 525 +-- .../apache/beam/sdk/transforms/RegexTest.java | 106 ++-- 2 files changed, 283 insertions(+), 348 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/79b04551/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java index 27104f6..a94130d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Regex.java @@ -24,22 +24,17 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; /** - * {@code PTransorm}s to use Regular Expressions to process elements in a - * {@link PCollection}. + * {@code PTransorm}s to use Regular Expressions to process elements in a {@link PCollection}. * - * - * {@link Regex#matches(String, int)} can be used to see if an entire line matches - * a Regex. {@link Regex#matchesKV(String, int, int)} can be used to see if an entire - * line matches a Regex and output certain groups as a {@link KV}. - * - * - * {@link Regex#find(String, int)} can be used to see if a portion of a line - * matches a Regex. {@link Regex#matchesKV(String, int, int)} can be used to see if a - * portion of a line matches a Regex and output certain groups as a {@link KV}. - * - * - * Lines that do not match the Regex will not be output. - * + * {@link Regex#matches(String, int)} can be used to see if an entire line matches a Regex. + * {@link Regex#matchesKV(String, int, int)} can be used to see if an entire line matches a Regex + * and output certain groups as a {@link KV}. + * + * {@link Regex#find(String, int)} can be used to see if a portion of a line matches a Regex. + * {@link Regex#matchesKV(String, int, int)} can be used to see if a portion of a line matches a + * Regex and output certain groups as a {@link KV}. + * + * Lines that do not match the Regex will not be output. */ public class Regex { private Regex() { @@ -47,159 +42,135 @@ public class Regex { } /** - * Returns a {@link Regex.Matches} {@link PTransform} that checks if - * the entire line matches the Regex. Returns the entire line (group 0) as a - * {@link PCollection}. - * @param regex - * The regular expression to run + * Returns a {@link Regex.Matches} {@link PTransform} that checks if the entire line matches the + * Regex. Returns the entire line (group 0) as a {@link PCollection}. + * + * @param regex The regular expression to run */ public static Matches matches(String regex) { return matches(regex, 0); } /** - * Returns a {@link Regex.Matches} {@link PTransform} that checks if - * the entire line matches the Regex. Returns the group as a - * {@link PCollection}. - * @param regex - * The regular expression to run - * @param group - * The Regex group to return as a PCollection + * Returns a {@link Regex.Matches} {@link PTransform} that checks if the entire line matches the + * Regex. Returns the group as a {@link PCollection}. + * + * @param regex The regular expression to run + * @param group The Regex group to return as a PCollection */ public static Matches matches(String regex, int group) { return new Matches(regex, group); } /** - * Returns a {@link Regex.MatchesKV} {@link PTransform} that checks - * if the entire line matches the Regex. Returns the specified groups as the - * key and value as a {@link PCollection}. - * @param regex - * The regular expression to run - * @param keyGroup - * The Regex group to use as the key - * @param valueGroup - * The Regex group to use the value + * Returns a {@link Regex.MatchesKV} {@link PTransform} that checks if the entire line matches the + * Regex. Returns the specified groups as the key and value as a {@link PCollection}. + * + * @param regex The regular expression to run + * @param keyGroup The Regex group
[2/2] incubator-beam git commit: This closes #1265
This closes #1265 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a1fdee5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a1fdee5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a1fdee5 Branch: refs/heads/master Commit: 2a1fdee5c617848e90896db5cafe685331132fef Parents: eac68cb 3cef71e Author: Kenneth KnowlesAuthored: Thu Nov 3 15:10:57 2016 -0700 Committer: Kenneth Knowles Committed: Thu Nov 3 15:10:57 2016 -0700 -- pom.xml | 73 +++- 1 file changed, 48 insertions(+), 25 deletions(-) --
[1/2] incubator-beam git commit: Move the separate package-info.java compile to java7 profile since it's not needed with java8 This fixes compiling in Eclipse (assuming Neon which require java8) Sligh
Repository: incubator-beam Updated Branches: refs/heads/master eac68cb21 -> 2a1fdee5c Move the separate package-info.java compile to java7 profile since it's not needed with java8 This fixes compiling in Eclipse (assuming Neon which require java8) Slightly speeds up Java8 compile (one invoke of compiler plugin) Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3cef71e0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3cef71e0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3cef71e0 Branch: refs/heads/master Commit: 3cef71e0653c24c44aef3515ce80c70eceac37c6 Parents: eac68cb Author: Daniel KulpAuthored: Wed Nov 2 16:14:00 2016 -0400 Committer: Kenneth Knowles Committed: Thu Nov 3 15:10:56 2016 -0700 -- pom.xml | 73 +++- 1 file changed, 48 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3cef71e0/pom.xml -- diff --git a/pom.xml b/pom.xml index cb1e4d6..ea7d4ae 100644 --- a/pom.xml +++ b/pom.xml @@ -133,6 +133,8 @@ 4.4.1 -Werror + -Xpkginfo:always +nothing pom @@ -224,6 +226,48 @@ -Xdoclint:-missing + + java7-packageinfo + +1.7 + + + + -Xpkginfo:legacy + **/package-info.java + + + + +maven-compiler-plugin + + + +compile-package-info + + compile + +compile + + +-Xpkginfo:always + + +**/package-info.java + + + + + + + + eclipse-jdt @@ -832,10 +876,6 @@ - default-compile @@ -843,29 +883,12 @@ compile - - **/package-info.java - - - - - - - compile-package-info - -compile - - compile - - -Xpkginfo:always + ${compiler.default.pkginfo.flag} - - **/package-info.java - + + ${compiler.default.exclude} +
[1/2] incubator-beam git commit: [BEAM-79] Port Gearpump runner from OldDoFn to new DoFn
Repository: incubator-beam Updated Branches: refs/heads/gearpump-runner 3933b5577 -> 323ec1188 [BEAM-79] Port Gearpump runner from OldDoFn to new DoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/45570b9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/45570b9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/45570b9c Branch: refs/heads/gearpump-runner Commit: 45570b9c7ebb11080deca3346fc601c69796612a Parents: 3933b55 Author: manuzhangAuthored: Mon Oct 31 11:52:22 2016 +0800 Committer: Kenneth Knowles Committed: Thu Nov 3 09:38:41 2016 -0700 -- .../gearpump/GearpumpPipelineTranslator.java| 2 +- .../translators/ParDoBoundMultiTranslator.java | 17 +- .../translators/ParDoBoundTranslator.java | 3 +- .../translators/functions/DoFnFunction.java | 19 +- .../translators/utils/DoFnRunnerFactory.java| 77 +++ .../translators/utils/GearpumpDoFnRunner.java | 516 --- .../utils/NoOpAggregatorFactory.java| 41 ++ 7 files changed, 143 insertions(+), 532 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java -- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java index 5045ae4..8588fff 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java @@ -108,7 +108,7 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor { @Override public void visitValue(PValue value, TransformTreeNode producer) { -LOG.info("visiting value {}", value); +LOG.debug("visiting value {}", value); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java -- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java index 2b49684..54f1c3f 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java @@ -27,11 +27,11 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; -import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner; +import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory; +import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory; import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader; import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; @@ -64,7 +64,7 @@ public class ParDoBoundMultiTranslator implements JavaStream >> outputStream = inputStream.flatMap( new DoFnMultiFunction<>( context.getPipelineOptions(), -transform.getFn(), +transform.getNewFn(), transform.getMainOutputTag(), transform.getSideOutputTags(), inputT.getWindowingStrategy(), @@ -87,18 +87,19 @@ public class ParDoBoundMultiTranslator implements FlatMapFunction >>, DoFnRunners.OutputManager { -private final DoFnRunner doFnRunner; +private final DoFnRunnerFactory doFnRunnerFactory; +private DoFnRunner doFnRunner; private final List >> outputs = Lists .newArrayList(); public DoFnMultiFunction( GearpumpPipelineOptions pipelineOptions, -OldDoFn doFn, +DoFn doFn,
[2/2] incubator-beam git commit: This closes #1234
This closes #1234 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/323ec118 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/323ec118 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/323ec118 Branch: refs/heads/gearpump-runner Commit: 323ec1188d2dffcdad640701e1827f90965994a8 Parents: 3933b55 45570b9 Author: Kenneth KnowlesAuthored: Thu Nov 3 09:39:17 2016 -0700 Committer: Kenneth Knowles Committed: Thu Nov 3 09:39:17 2016 -0700 -- .../gearpump/GearpumpPipelineTranslator.java| 2 +- .../translators/ParDoBoundMultiTranslator.java | 17 +- .../translators/ParDoBoundTranslator.java | 3 +- .../translators/functions/DoFnFunction.java | 19 +- .../translators/utils/DoFnRunnerFactory.java| 77 +++ .../translators/utils/GearpumpDoFnRunner.java | 516 --- .../utils/NoOpAggregatorFactory.java| 41 ++ 7 files changed, 143 insertions(+), 532 deletions(-) --
[3/3] incubator-beam git commit: This closes #968
This closes #968 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e0cfe5b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e0cfe5b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e0cfe5b Branch: refs/heads/master Commit: 7e0cfe5b9c2ac2d0aeaf833a5e3097ce25a40c03 Parents: f6ad58d a36f1fa Author: Kenneth KnowlesAuthored: Tue Nov 1 15:22:04 2016 -0700 Committer: Kenneth Knowles Committed: Tue Nov 1 15:22:04 2016 -0700 -- .../apache/beam/sdk/values/TypeDescriptor.java | 35 --- .../apache/beam/sdk/values/TypeDescriptors.java | 1 - .../apache/beam/sdk/values/TypeParameter.java | 62 .../beam/sdk/values/TypeDescriptorTest.java | 12 4 files changed, 100 insertions(+), 10 deletions(-) --
[1/3] incubator-beam git commit: Added a where method on TypeDescriptor to allow substituting a type parameter with a TypeDescriptor. In the process introduced a TypeParameter class which represents t
Repository: incubator-beam Updated Branches: refs/heads/master f6ad58d60 -> 7e0cfe5b9 Added a where method on TypeDescriptor to allow substituting a type parameter with a TypeDescriptor. In the process introduced a TypeParameter class which represents the parameter type. This is useful when having a type such as Set to be able to specify what T is as part of the TypeDescriptor information. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/699075e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/699075e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/699075e2 Branch: refs/heads/master Commit: 699075e250973447a9cdce1eb9c52d3406788212 Parents: f6ad58d Author: Jeremie Lenfant-EngelmannAuthored: Thu Oct 20 19:29:39 2016 -0700 Committer: Kenneth Knowles Committed: Tue Nov 1 15:14:24 2016 -0700 -- .../apache/beam/sdk/values/TypeDescriptor.java | 35 +++ .../apache/beam/sdk/values/TypeDescriptors.java | 1 - .../apache/beam/sdk/values/TypeParameter.java | 45 .../beam/sdk/values/TypeDescriptorTest.java | 12 ++ 4 files changed, 83 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/699075e2/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java index 724b8b6..6eabf42 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.values; import com.google.common.collect.Lists; import com.google.common.reflect.Invokable; import com.google.common.reflect.Parameter; -import com.google.common.reflect.TypeParameter; +import com.google.common.reflect.TypeResolver; import com.google.common.reflect.TypeToken; import java.io.Serializable; import java.lang.reflect.Field; @@ -171,14 +171,6 @@ public abstract class TypeDescriptor implements Serializable { } /** - * Creates a new {@link SimpleTypeDescriptor} using the {@link #token}. - * Package visible so this isn't abused. - */ - TypeDescriptor where(TypeParameter typeParam, TypeDescriptor typeDescriptor) { -return new SimpleTypeDescriptor<>(token.where(typeParam, typeDescriptor.token)); - } - - /** * Returns the {@link Type} represented by this {@link TypeDescriptor}. */ public Type getType() { @@ -322,6 +314,31 @@ public abstract class TypeDescriptor implements Serializable { return classes; } + /** + * Returns a new {@code TypeDescriptor} where type variables represented by + * {@code typeParameter} are substituted by {@code typeDescriptor}. For example, it can be used to + * construct {@code Map } for any {@code K} and {@code V} type: {@code + * static TypeDescriptor
[2/3] incubator-beam git commit: Add header
Add header Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a36f1fa8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a36f1fa8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a36f1fa8 Branch: refs/heads/master Commit: a36f1fa89079686cb1ce25a3c0abd3dd4f328c92 Parents: 699075e Author: Kenneth KnowlesAuthored: Tue Nov 1 15:21:10 2016 -0700 Committer: Kenneth Knowles Committed: Tue Nov 1 15:21:10 2016 -0700 -- .../org/apache/beam/sdk/values/TypeParameter.java | 17 + 1 file changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a36f1fa8/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeParameter.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeParameter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeParameter.java index a9a658e..dd9d59c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeParameter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeParameter.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.values; import static com.google.common.base.Preconditions.checkArgument;
[1/3] incubator-beam git commit: Delete the obsolete ExecutableTrigger
Repository: incubator-beam Updated Branches: refs/heads/master af1764785 -> 978c99e9d Delete the obsolete ExecutableTrigger Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1abe47bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1abe47bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1abe47bb Branch: refs/heads/master Commit: 1abe47bb566cf2bda700b602161c2139e38121e4 Parents: 97cd3e5 Author: Kenneth KnowlesAuthored: Tue Oct 25 13:42:53 2016 -0700 Committer: Kenneth Knowles Committed: Mon Oct 31 19:22:49 2016 -0700 -- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 3 +- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 3 +- .../beam/runners/core/ReduceFnTester.java | 6 +- .../direct/WatermarkCallbackExecutor.java | 3 +- .../dataflow/DataflowPipelineTranslator.java| 2 +- .../org/apache/beam/sdk/transforms/Flatten.java | 3 +- .../apache/beam/sdk/transforms/GroupByKey.java | 4 +- .../beam/sdk/transforms/windowing/Window.java | 2 +- .../apache/beam/sdk/util/ExecutableTrigger.java | 131 --- .../apache/beam/sdk/util/WindowingStrategy.java | 10 +- .../sdk/transforms/windowing/WindowTest.java| 6 +- .../beam/sdk/util/ExecutableTriggerTest.java| 109 --- 12 files changed, 19 insertions(+), 263 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index 75a5aa7..dde883c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -84,8 +84,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn< key, windowingStrategy, ExecutableTriggerStateMachine.create( -TriggerStateMachines.stateMachineForTrigger( -windowingStrategy.getTrigger().getSpec())), + TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())), stateInternals, timerInternals, c.windowingInternals(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index 4dea775..f1a6ded 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -73,8 +73,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn { nonCombining(WindowingStrategy windowingStrategy) throws Exception { return new ReduceFnTester ( windowingStrategy, - TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger().getSpec()), +
[3/3] incubator-beam git commit: This closes #1187
This closes #1187 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/978c99e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/978c99e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/978c99e9 Branch: refs/heads/master Commit: 978c99e9d699f79d25662c98c091114b8f608ee7 Parents: af17647 1abe47b Author: Kenneth KnowlesAuthored: Mon Oct 31 20:40:02 2016 -0700 Committer: Kenneth Knowles Committed: Mon Oct 31 20:40:02 2016 -0700 -- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 3 +- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 3 +- .../beam/runners/core/ReduceFnTester.java | 6 +- .../direct/WatermarkCallbackExecutor.java | 3 +- .../dataflow/DataflowPipelineTranslator.java| 2 +- .../beam/runners/dataflow/DataflowRunner.java | 4 +- .../org/apache/beam/sdk/transforms/Flatten.java | 3 +- .../apache/beam/sdk/transforms/GroupByKey.java | 4 +- .../beam/sdk/transforms/windowing/Window.java | 2 +- .../apache/beam/sdk/util/ExecutableTrigger.java | 131 --- .../apache/beam/sdk/util/WindowingStrategy.java | 10 +- .../sdk/transforms/windowing/WindowTest.java| 6 +- .../beam/sdk/util/ExecutableTriggerTest.java| 109 --- 13 files changed, 21 insertions(+), 265 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/978c99e9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java --
[2/3] incubator-beam git commit: Update Dataflow worker image
Update Dataflow worker image Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/97cd3e5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/97cd3e5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/97cd3e5c Branch: refs/heads/master Commit: 97cd3e5c02bf8c048029b51d7588a110eb5ce62d Parents: 7e45830 Author: Kenneth KnowlesAuthored: Mon Oct 31 11:27:36 2016 -0700 Committer: Kenneth Knowles Committed: Mon Oct 31 19:22:49 2016 -0700 -- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/97cd3e5c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 2943ab9..ce126db 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -208,9 +208,9 @@ public class DataflowRunner extends PipelineRunner { // Default Docker container images that execute Dataflow worker harness, residing in Google // Container Registry, separately for Batch and Streaming. public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE = - "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161026"; + "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161031"; public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE = - "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161026"; + "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161031"; // The limit of CreateJob request size. private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;
[1/2] incubator-beam git commit: Add static factory methods for DoFnInfo
Repository: incubator-beam Updated Branches: refs/heads/master 7e4583027 -> af1764785 Add static factory methods for DoFnInfo This provides a clearer migration path away from deprecated code paths than constructor overloads which can only be selected by upcasting, etc. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d4922945 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d4922945 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d4922945 Branch: refs/heads/master Commit: d492294520aa721bdb4906177a7ac83e139003b0 Parents: 7e45830 Author: Kenneth KnowlesAuthored: Thu Oct 27 19:01:03 2016 -0700 Committer: Kenneth Knowles Committed: Mon Oct 31 19:19:40 2016 -0700 -- .../dataflow/DataflowPipelineTranslator.java | 6 -- .../beam/runners/dataflow/util/DoFnInfo.java | 18 ++ 2 files changed, 18 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4922945/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 12aa696..0c5ac15 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -1067,8 +1067,10 @@ public class DataflowPipelineTranslator { context.addInput(PropertyNames.USER_FN, fn.getClass().getName()); context.addInput( PropertyNames.SERIALIZED_FN, -byteArrayToJsonString(serializeToByteArray( -new DoFnInfo(fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap; +byteArrayToJsonString( +serializeToByteArray( +DoFnInfo.forFn( +fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap; } private static BiMap translateOutputs( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4922945/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index bfa12e2..b84def8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -44,10 +44,20 @@ public class DoFnInfo implements Serializable { private final Map outputMap; /** - * Creates a {@link DoFnInfo} for the given {@link DoFn} or {@link OldDoFn} and auxiliary bits and - * pieces. + * Creates a {@link DoFnInfo} for the given {@link Serializable} object, which is expected to be a + * {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob. */ - public DoFnInfo( + public static DoFnInfo forFn( + Serializable doFn, + WindowingStrategy windowingStrategy, + Iterable sideInputViews, + Coder inputCoder, + long mainOutput, + Map outputMap) { +return new DoFnInfo(doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); + } + + private DoFnInfo( Serializable doFn, WindowingStrategy windowingStrategy, Iterable sideInputViews, @@ -63,7 +73,7 @@ public class DoFnInfo implements Serializable { } /** - * @deprecated call the constructor with a {@link Serializable} + * @deprecated use {@link #forFn}. */ @Deprecated public DoFnInfo(
[2/2] incubator-beam git commit: This closes #1215
This closes #1215 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af176478 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af176478 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af176478 Branch: refs/heads/master Commit: af1764785f2033dd2e42b4cc2bf7f35793759a51 Parents: 7e45830 d492294 Author: Kenneth KnowlesAuthored: Mon Oct 31 19:20:28 2016 -0700 Committer: Kenneth Knowles Committed: Mon Oct 31 19:20:28 2016 -0700 -- .../dataflow/DataflowPipelineTranslator.java | 6 -- .../beam/runners/dataflow/util/DoFnInfo.java | 18 ++ 2 files changed, 18 insertions(+), 6 deletions(-) --
[3/3] incubator-beam git commit: This closes #1226
This closes #1226 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d8eb8be1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d8eb8be1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d8eb8be1 Branch: refs/heads/master Commit: d8eb8be13737d625d03ce1bbb012f0a57e0260d3 Parents: 54a7374 1e5de72 Author: Kenneth KnowlesAuthored: Mon Oct 31 10:25:50 2016 -0700 Committer: Kenneth Knowles Committed: Mon Oct 31 10:25:50 2016 -0700 -- pom.xml | 2 +- runners/core-java/pom.xml| 1 - .../main/java/org/apache/beam/runners/spark/io/SourceRDD.java| 4 +++- 3 files changed, 4 insertions(+), 3 deletions(-) --
[2/3] incubator-beam git commit: Fixes a compile error in SourceRDD
Fixes a compile error in SourceRDD Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63cbb974 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63cbb974 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63cbb974 Branch: refs/heads/master Commit: 63cbb9742d836dff250ba6d8d447dc54d5b76a54 Parents: 54a7374 Author: Eugene KirpichovAuthored: Fri Oct 28 15:21:00 2016 -0700 Committer: Kenneth Knowles Committed: Mon Oct 31 10:25:48 2016 -0700 -- .../main/java/org/apache/beam/runners/spark/io/SourceRDD.java| 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/63cbb974/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index 3995c89..cf37b3a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -216,8 +216,10 @@ public class SourceRDD { private final MicrobatchSource microbatchSource; private final SparkRuntimeContext runtimeContext; +// to satisfy Scala API. private static final scala.collection.immutable.List NIL = -scala.collection.immutable.List.empty(); +scala.collection.JavaConversions +.asScalaBuffer(Collections. emptyList()).toList(); public Unbounded(SparkContext sc, SparkRuntimeContext runtimeContext,