This is an automated email from the ASF dual-hosted git repository. kenn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new d05da8c Pin spotless and googleJavaFormat to latest; apply globally d05da8c is described below commit d05da8cf3dd74b2e5496ca85c10f0965c68cadf5 Author: Kenneth Knowles <k...@apache.org> AuthorDate: Mon Jan 14 15:19:14 2019 -0800 Pin spotless and googleJavaFormat to latest; apply globally --- build.gradle | 2 +- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- .../euphoria/core/client/operator/Distinct.java | 13 ++-- .../euphoria/core/client/operator/Join.java | 13 ++-- .../euphoria/core/client/operator/ReduceByKey.java | 13 ++-- .../core/docs/DocumentationExamplesTest.java | 58 +++++++++-------- .../euphoria/core/testkit/WindowingTest.java | 73 +++++++++++----------- .../translate/BroadcastHashJoinTranslatorTest.java | 14 ++--- .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 2 +- .../java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 9 ++- .../apache/beam/sdk/io/kafka/KafkaRecordCoder.java | 2 +- .../beam/sdk/io/kafka/KafkaUnboundedReader.java | 12 ++-- .../CustomTimestampPolicyWithLimitedDelayTest.java | 3 +- .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 15 ++--- .../sdk/io/kinesis/DynamicCheckpointGenerator.java | 3 +- .../sdk/io/kinesis/GetKinesisRecordsResult.java | 3 +- .../beam/sdk/io/kinesis/ShardReadersPool.java | 4 +- .../beam/sdk/io/kinesis/AmazonKinesisMock.java | 3 +- .../java/org/apache/beam/sdk/io/kudu/KuduIO.java | 4 +- .../beam/sdk/io/rabbitmq/RabbitMqIOTest.java | 12 ++-- .../sdk/io/synthetic/SyntheticBoundedSource.java | 3 +- .../sdk/io/synthetic/SyntheticUnboundedSource.java | 4 +- .../beam/sdk/io/synthetic/BundleSplitterTest.java | 6 +- .../java/org/apache/beam/sdk/io/tika/TikaIO.java | 2 +- .../org/apache/beam/sdk/nexmark/NexmarkUtils.java | 4 +- .../org/apache/beam/sdk/nexmark/PubsubHelper.java | 8 +-- .../apache/beam/sdk/nexmark/queries/Query10.java | 2 +- .../apache/beam/sdk/nexmark/queries/Query3.java | 2 +- .../nexmark/queries/SessionSideInputJoinModel.java | 6 +- .../sdk/testutils/publishing/BigQueryClient.java | 4 +- .../publishing/BigQueryResultsPublisher.java | 5 +- 31 files changed, 141 insertions(+), 165 deletions(-) diff --git a/build.gradle b/build.gradle index f8d8ef2..bb9de48 100644 --- a/build.gradle +++ b/build.gradle @@ -54,7 +54,7 @@ buildscript { classpath "io.spring.gradle:propdeps-plugin:0.0.9.RELEASE" // Enable provided and optional configurations classpath "gradle.plugin.org.nosphere.apache:creadur-rat-gradle:0.3.1" // Enable Apache license enforcement classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.11.0" // Enable Avro code generation - classpath "com.diffplug.spotless:spotless-plugin-gradle:3.7.0" // Enable a code formatting plugin + classpath "com.diffplug.spotless:spotless-plugin-gradle:3.16.0" // Enable a code formatting plugin classpath "gradle.plugin.com.github.blindpirate:gogradle:0.10" // Enable Go code compilation classpath "gradle.plugin.com.palantir.gradle.docker:gradle-docker:0.20.1" // Enable building Docker containers classpath "cz.malohlava:visteg:1.0.3" // Enable generating Gradle task dependencies as ".dot" files diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 126b4c9..1d7ba4b 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -727,7 +727,7 @@ class BeamModulePlugin implements Plugin<Project> { enforceCheck !disableSpotlessCheck java { licenseHeader javaLicenseHeader - googleJavaFormat() + googleJavaFormat('1.7') // Details see: https://github.com/diffplug/spotless/blob/master/PADDEDCELL.md paddedCell() diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java index 1cb8d34..942e8f3 100644 --- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java +++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java @@ -62,13 +62,12 @@ import org.joda.time.Duration; */ @Audience(Audience.Type.CLIENT) @Recommended( - reason = - "Might be useful to override the default " - + "implementation because of performance reasons" - + "(e.g. using bloom filters), which might reduce the space complexity", - state = StateComplexity.CONSTANT, - repartitions = 1 -) + reason = + "Might be useful to override the default " + + "implementation because of performance reasons" + + "(e.g. using bloom filters), which might reduce the space complexity", + state = StateComplexity.CONSTANT, + repartitions = 1) public class Distinct<InputT, OutputT> extends ShuffleOperator<InputT, OutputT, OutputT> implements CompositeOperator<InputT, OutputT> { diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java index ddd9eb0..e90d244 100644 --- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java +++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java @@ -71,13 +71,12 @@ import org.joda.time.Duration; */ @Audience(Audience.Type.CLIENT) @Recommended( - reason = - "Might be useful to override because of performance reasons in a " - + "specific join types (e.g. sort join), which might reduce the space " - + "complexity", - state = StateComplexity.LINEAR, - repartitions = 1 -) + reason = + "Might be useful to override because of performance reasons in a " + + "specific join types (e.g. sort join), which might reduce the space " + + "complexity", + state = StateComplexity.LINEAR, + repartitions = 1) public class Join<LeftT, RightT, KeyT, OutputT> extends ShuffleOperator<Object, KeyT, KV<KeyT, OutputT>> { diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java index af2a518..6b1eb42 100644 --- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java +++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java @@ -87,13 +87,12 @@ import org.joda.time.Duration; */ @Audience(Audience.Type.CLIENT) @Recommended( - reason = - "Is very recommended to override because of performance in " - + "a specific area of (mostly) batch calculations where combiners " - + "can be efficiently used in the executor-specific implementation", - state = StateComplexity.CONSTANT_IF_COMBINABLE, - repartitions = 1 -) + reason = + "Is very recommended to override because of performance in " + + "a specific area of (mostly) batch calculations where combiners " + + "can be efficiently used in the executor-specific implementation", + state = StateComplexity.CONSTANT_IF_COMBINABLE, + repartitions = 1) public class ReduceByKey<InputT, KeyT, ValueT, OutputT> extends ShuffleOperator<InputT, KeyT, KV<KeyT, OutputT>> implements TypeAware.Value<ValueT> { diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java index cf8f896..4b2eb8c 100644 --- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java +++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java @@ -207,15 +207,15 @@ public class DocumentationExamplesTest { final PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); - //Register `KryoCoderProvider` which attempt to use `KryoCoder` to every non-primitive type + // Register `KryoCoderProvider` which attempt to use `KryoCoder` to every non-primitive type KryoCoderProvider.of().registerTo(pipeline); - //Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types + // Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types options.as(KryoOptions.class).setKryoRegistrationRequired(true); KryoCoderProvider.of( - kryo -> { //KryoRegistrar of your uwn - kryo.register(KryoSerializedElementType.class); //other may follow + kryo -> { // KryoRegistrar of your uwn + kryo.register(KryoSerializedElementType.class); // other may follow }) .registerTo(pipeline); @@ -510,14 +510,15 @@ public class DocumentationExamplesTest { new SomeEventObject(3), new SomeEventObject(4))); - // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp + // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods + // returns time-stamp PCollection<SomeEventObject> timeStampedEvents = FlatMap.named("extract-event-time") .of(events) .using((SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e)) .eventTimeBy(SomeEventObject::getEventTimeInMillis) .output(); - //Euphoria will now know event time for each event + // Euphoria will now know event time for each event pipeline.run(); } @@ -530,7 +531,7 @@ public class DocumentationExamplesTest { // suppose nums contains: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] PCollection<Integer> divisibleBythree = Filter.named("divisibleByFive").of(nums).by(e -> e % 3 == 0).output(); - //divisibleBythree will contain: [ 0, 3, 6, 9] + // divisibleBythree will contain: [ 0, 3, 6, 9] PAssert.that(divisibleBythree).containsInAnyOrder(0, 3, 6, 9); pipeline.run(); @@ -542,7 +543,7 @@ public class DocumentationExamplesTest { PCollection<String> animals = pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck")); - //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] + // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] PCollection<KV<Integer, Long>> countOfAnimalNamesByLength = ReduceByKey.named("to-letters-couts") .of(animals) @@ -551,7 +552,8 @@ public class DocumentationExamplesTest { .valueBy(e -> 1) .reduceBy(Stream::count) .output(); - // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ] + // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, + // 1L), KV.of(8, 1L) ] PAssert.that(countOfAnimalNamesByLength) .containsInAnyOrder( @@ -566,7 +568,7 @@ public class DocumentationExamplesTest { PCollection<String> animals = pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck")); - //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] + // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] PCollection<KV<Integer, Long>> countOfAnimalNamesByLength = ReduceByKey.named("to-letters-couts") .of(animals) @@ -575,7 +577,8 @@ public class DocumentationExamplesTest { .valueBy(e -> 1L) .combineBy(s -> s.mapToLong(l -> l).sum()) .output(); - // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ] + // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, + // 1L), KV.of(8, 1L) ] PAssert.that(countOfAnimalNamesByLength) .containsInAnyOrder( @@ -590,7 +593,7 @@ public class DocumentationExamplesTest { PCollection<String> animals = pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck")); - //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] + // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] PCollection<KV<Integer, Long>> countOfAnimalNamesByLength = ReduceByKey.named("to-letters-couts") .of(animals) @@ -603,7 +606,8 @@ public class DocumentationExamplesTest { collector.asContext().getCounter("num-of-keys").increment(); }) .output(); - // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ] + // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, + // 1L), KV.of(8, 1L) ] PAssert.that(countOfAnimalNamesByLength) .containsInAnyOrder( @@ -659,7 +663,7 @@ public class DocumentationExamplesTest { PCollection<String> animals = pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck")); - //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] + // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"] PCollection<KV<Integer, Long>> countOfAnimalNamesByLength = ReduceByKey.named("to-letters-couts") .of(animals) @@ -668,7 +672,8 @@ public class DocumentationExamplesTest { .valueBy(e -> 1L) .combineBy(Fold.of((l1, l2) -> l1 + l2)) .output(); - // countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ] + // countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, + // 1L), KV.of(8, 1L) ] PAssert.that(countOfAnimalNamesByLength) .containsInAnyOrder( @@ -681,7 +686,7 @@ public class DocumentationExamplesTest { public void testSumByKeyOperator() { PCollection<Integer> input = pipeline.apply(Create.of(asList(1, 2, 3, 4, 5, 6, 7, 8, 9))); - //suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ] + // suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ] PCollection<KV<Integer, Long>> output = SumByKey.named("sum-odd-and-even") .of(input) @@ -707,11 +712,12 @@ public class DocumentationExamplesTest { .apply("rodents", Create.of("squirrel", "mouse", "rat", "lemming", "beaver")) .setTypeDescriptor(TypeDescriptors.strings()); - //suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ] - //suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ] + // suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ] + // suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ] PCollection<String> animals = Union.named("to-animals").of(cats, rodents).output(); - // animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver" + // animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", + // "lemming", "beaver" PAssert.that(animals) .containsInAnyOrder( "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"); @@ -732,7 +738,8 @@ public class DocumentationExamplesTest { new SomeEventObject(3), new SomeEventObject(4)))); - // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp + // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods + // returns time-stamp PCollection<SomeEventObject> timeStampedEvents = AssignEventTime.named("extract-event-time") .of(events) @@ -795,17 +802,18 @@ public class DocumentationExamplesTest { "duck", "caterpillar")); - // suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", "duck", "caterpillar" ] + // suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", + // "duck", "caterpillar" ] PCollection<Triple<Character, String, Integer>> longestNamesByLetter = TopPerKey.named("longest-animal-names") .of(animals) .keyBy(name -> name.charAt(0)) // first character is the key .valueBy(UnaryFunction.identity()) // value type is the same as input element type - .scoreBy( - String - ::length) // length defines score, note that Integer implements Comparable<Integer> + .scoreBy(String::length) // length defines score, note that Integer implements + // Comparable<Integer> .output(); - // longestNamesByLetter will contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ] + // longestNamesByLetter will contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", + // 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ] PAssert.that(longestNamesByLetter) .containsInAnyOrder( diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/WindowingTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/WindowingTest.java index 991879d..b183799 100644 --- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/WindowingTest.java +++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/WindowingTest.java @@ -17,40 +17,40 @@ */ package org.apache.beam.sdk.extensions.euphoria.core.testkit; // -//import static org.junit.Assert.assertEquals; -// -//import java.time.Instant; -//import java.util.Arrays; -//import java.util.List; -//import java.util.Objects; -//import java.util.concurrent.atomic.AtomicBoolean; -//import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset; -//import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.TimeInterval; -//import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing; -//import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector; -//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime; -//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct; -//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap; -//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements; -//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey; -//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceStateByKey; -//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceWindow; -//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.State; -//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.StateContext; -//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorage; -//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorageDescriptor; -//import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums; -//import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple; -//import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.AbstractOperatorTest; -//import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.Processing; -//import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -//import org.apache.beam.sdk.transforms.windowing.FixedWindows; -//import org.apache.beam.sdk.values.KV; -//import org.junit.Test; -// -///** Tests capabilities of {@link Windowing}. */ -//@Processing(Processing.Type.ALL) -//public class WindowingTest extends AbstractOperatorTest { +// import static org.junit.Assert.assertEquals; +// +// import java.time.Instant; +// import java.util.Arrays; +// import java.util.List; +// import java.util.Objects; +// import java.util.concurrent.atomic.AtomicBoolean; +// import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset; +// import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.TimeInterval; +// import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing; +// import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector; +// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime; +// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct; +// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap; +// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements; +// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey; +// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceStateByKey; +// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceWindow; +// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.State; +// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.StateContext; +// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorage; +// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorageDescriptor; +// import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums; +// import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple; +// import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.AbstractOperatorTest; +// import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.Processing; +// import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +// import org.apache.beam.sdk.transforms.windowing.FixedWindows; +// import org.apache.beam.sdk.values.KV; +// import org.junit.Test; +// +/// ** Tests capabilities of {@link Windowing}. */ +// @Processing(Processing.Type.ALL) +// public class WindowingTest extends AbstractOperatorTest { // // static final AtomicBoolean ON_CLEAR_VALIDATED = new AtomicBoolean(false); // @@ -270,7 +270,8 @@ package org.apache.beam.sdk.extensions.euphoria.core.testkit; // // extract window timestamp // return FlatMap.of(keyValues) // .using( -// (KV<String, Integer> in, Collector<Triple<Instant, Instant, Integer>> out) -> { +// (KV<String, Integer> in, Collector<Triple<Instant, Instant, Integer>> out) -> +// { // long windowBegin = ((TimeInterval) out.getWindow()).getStartMillis(); // long windowEnd = ((TimeInterval) out.getWindow()).getEndMillis(); // out.collect( @@ -485,4 +486,4 @@ package org.apache.beam.sdk.extensions.euphoria.core.testkit; // } // } // */ -//} +// } diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslatorTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslatorTest.java index daff062..1e4d6e8 100644 --- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslatorTest.java +++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslatorTest.java @@ -55,18 +55,19 @@ public class BroadcastHashJoinTranslatorTest { // create input to be broadcast PCollection<KV<Integer, String>> lengthStrings = - p.apply("names", - Create.of(KV.of(1, "one"), KV.of(2, "two"), KV.of(3, "three"))) - .setTypeDescriptor( - TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings())); + p.apply("names", Create.of(KV.of(1, "one"), KV.of(2, "two"), KV.of(3, "three"))) + .setTypeDescriptor( + TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings())); UnaryFunction<KV<Integer, String>, Integer> sharedKeyExtractor = KV::getKey; // other datasets to be joined with PCollection<String> letters = - p.apply("letters", Create.of("a", "b", "c", "d")).setTypeDescriptor(TypeDescriptors.strings()); + p.apply("letters", Create.of("a", "b", "c", "d")) + .setTypeDescriptor(TypeDescriptors.strings()); PCollection<String> acronyms = - p.apply("acronyms", Create.of("B2K", "DIY", "FKA", "EOBD")).setTypeDescriptor(TypeDescriptors.strings()); + p.apply("acronyms", Create.of("B2K", "DIY", "FKA", "EOBD")) + .setTypeDescriptor(TypeDescriptors.strings()); PCollection<KV<Integer, String>> lettersJoined = LeftJoin.named("join-letters-with-lengths") @@ -88,7 +89,6 @@ public class BroadcastHashJoinTranslatorTest { TypeDescriptors.strings()) .output(); - PAssert.that(lettersJoined) .containsInAnyOrder( KV.of(1, "a-one"), KV.of(1, "b-one"), KV.of(1, "c-one"), KV.of(1, "d-one")); diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index 6caf760..0e9127a 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -106,7 +106,7 @@ public class JdbcIOTest implements Serializable { derbyServer.ping(); started = true; } catch (Throwable t) { - //ignore, still trying to start + // ignore, still trying to start } } } diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 76139ef..b08a848 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -360,7 +360,8 @@ public class JmsIOTest { session.close(); connection.close(); - // create a JmsIO.Read with a decorated ConnectionFactory which will introduce a delay in sending + // create a JmsIO.Read with a decorated ConnectionFactory which will introduce a delay in + // sending // acknowledgements - this should help uncover threading issues around checkpoint management. JmsIO.Read spec = JmsIO.read() @@ -383,7 +384,8 @@ public class JmsIOTest { // the messages are still pending in the queue (no ACK yet) assertEquals(messagesToProcess, count(QUEUE)); - // we finalize the checkpoint for the already-processed messages while simultaneously consuming the remainder of + // we finalize the checkpoint for the already-processed messages while simultaneously consuming + // the remainder of // messages from the queue Thread runner = new Thread( @@ -399,7 +401,8 @@ public class JmsIOTest { runner.start(); reader.getCheckpointMark().finalizeCheckpoint(); - // Concurrency issues would cause an exception to be thrown before this method exits, failing the test + // Concurrency issues would cause an exception to be thrown before this method exits, failing + // the test runner.join(); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java index 41efb69..bd46f71 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java @@ -114,7 +114,7 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> { @Override public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value) { return kvCoder.isRegisterByteSizeObserverCheap(value.getKV()); - //TODO : do we have to implement getEncodedSize()? + // TODO : do we have to implement getEncodedSize()? } @SuppressWarnings("unchecked") diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 22f595f..ee058aa 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -268,8 +268,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> { } // Return minimum watermark among partitions. - return partitionStates - .stream() + return partitionStates.stream() .map(PartitionState::updateAndGetWatermark) .min(Comparator.naturalOrder()) .get(); @@ -279,8 +278,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> { public CheckpointMark getCheckpointMark() { reportBacklog(); return new KafkaCheckpointMark( - partitionStates - .stream() + partitionStates.stream() .map( p -> new PartitionMark( @@ -394,7 +392,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> { private static final long UNINITIALIZED_OFFSET = -1; - //Add SpEL instance to cover the interface difference of Kafka client + // Add SpEL instance to cover the interface difference of Kafka client private transient ConsumerSpEL consumerSpEL; /** watermark before any records have been read. */ @@ -604,9 +602,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> { LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark); consumer.commitSync( - checkpointMark - .getPartitions() - .stream() + checkpointMark.getPartitions().stream() .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET) .collect( Collectors.toMap( diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java index e1845d3..5fbe272 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java @@ -42,8 +42,7 @@ public class CustomTimestampPolicyWithLimitedDelayTest { private static List<Long> getTimestampsForRecords( TimestampPolicy<String, String> policy, Instant now, List<Long> timestampOffsets) { - return timestampOffsets - .stream() + return timestampOffsets.stream() .map( ts -> { Instant result = diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 6d25b42..63e14a3 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -238,9 +238,7 @@ public class KafkaIOTest { @Override public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes( Map<TopicPartition, Long> timestampsToSearch) { - return timestampsToSearch - .entrySet() - .stream() + return timestampsToSearch.entrySet().stream() .map( e -> { // In test scope, timestamp == offset. @@ -283,10 +281,11 @@ public class KafkaIOTest { if (config.get("inject.error.at.eof") != null) { consumer.setException(new KafkaException("Injected error in consumer.poll()")); } - // MockConsumer.poll(timeout) does not actually wait even when there aren't any records. + // MockConsumer.poll(timeout) does not actually wait even when there aren't any + // records. // Add a small wait here in order to avoid busy looping in the reader. Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); - //TODO: BEAM-4086: testUnboundedSourceWithoutBoundedWrapper() occasionally hangs + // TODO: BEAM-4086: testUnboundedSourceWithoutBoundedWrapper() occasionally hangs // without this wait. Need to look into it. } consumer.schedulePollTask(this); @@ -1583,11 +1582,13 @@ public class KafkaIOTest { producerKey = String.valueOf(ThreadLocalRandom.current().nextLong()); mockProducer = new MockProducer<Integer, Long>( - false, // disable synchronous completion of send. see ProducerSendCompletionThread below. + false, // disable synchronous completion of send. see ProducerSendCompletionThread + // below. new IntegerSerializer(), new LongSerializer()) { - // override flush() so that it does not complete all the waiting sends, giving a chance to + // override flush() so that it does not complete all the waiting sends, giving a chance + // to // ProducerCompletionThread to inject errors. @Override diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java index 1bb971e..362f5d3 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java @@ -62,8 +62,7 @@ class DynamicCheckpointGenerator implements CheckpointGenerator { shardsAtStartingPoint, startingPoint.getTimestamp()); return new KinesisReaderCheckpoint( - shardsAtStartingPoint - .stream() + shardsAtStartingPoint.stream() .map(shard -> new ShardCheckpoint(streamName, shard.getShardId(), startingPoint)) .collect(Collectors.toList())); } diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java index 8a9afd8..6fefb43 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java @@ -35,8 +35,7 @@ class GetKinesisRecordsResult { final String streamName, final String shardId) { this.records = - records - .stream() + records.stream() .map( input -> { assert input != null; // to make FindBugs happy diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java index 6b5d841..3f41639 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java @@ -215,9 +215,7 @@ class ShardReadersPool { KinesisReaderCheckpoint getCheckpointMark() { ImmutableMap<String, ShardRecordsIterator> currentShardIterators = shardIteratorsMap.get(); return new KinesisReaderCheckpoint( - currentShardIterators - .values() - .stream() + currentShardIterators.values().stream() .map( shardRecordsIterator -> { checkArgument( diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java index 5e3597d..4749d2f 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java @@ -159,8 +159,7 @@ class AmazonKinesisMock implements AmazonKinesis { @Override public AmazonKinesis getKinesisClient() { return new AmazonKinesisMock( - shardedData - .stream() + shardedData.stream() .map(testDatas -> transform(testDatas, TestData::convertToRecord)) .collect(Collectors.toList()), numberOfRecordsPerGet); diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java index 69394bc..126b706 100644 --- a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java +++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java @@ -322,9 +322,7 @@ public class KuduIO { } else { Stream<BoundedSource<T>> sources = - spec.getKuduService() - .createTabletScanners(spec) - .stream() + spec.getKuduService().createTabletScanners(spec).stream() .map(s -> new KuduIO.KuduSource<T>(spec, spec.getCoder(), s)); return sources.collect(Collectors.toList()); } diff --git a/sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java b/sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java index d403c38..13b9240 100644 --- a/sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java +++ b/sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java @@ -103,8 +103,7 @@ public class RabbitMqIOTest implements Serializable { new String(message.getBody(), StandardCharsets.UTF_8))); List<String> records = - generateRecords(maxNumRecords) - .stream() + generateRecords(maxNumRecords).stream() .map(record -> new String(record, StandardCharsets.UTF_8)) .collect(Collectors.toList()); PAssert.that(output).containsInAnyOrder(records); @@ -149,8 +148,7 @@ public class RabbitMqIOTest implements Serializable { new String(message.getBody(), StandardCharsets.UTF_8))); List<String> records = - generateRecords(maxNumRecords) - .stream() + generateRecords(maxNumRecords).stream() .map(record -> new String(record, StandardCharsets.UTF_8)) .collect(Collectors.toList()); PAssert.that(output).containsInAnyOrder(records); @@ -201,8 +199,7 @@ public class RabbitMqIOTest implements Serializable { public void testWriteQueue() throws Exception { final int maxNumRecords = 1000; List<RabbitMqMessage> data = - generateRecords(maxNumRecords) - .stream() + generateRecords(maxNumRecords).stream() .map(bytes -> new RabbitMqMessage(bytes)) .collect(Collectors.toList()); p.apply(Create.of(data)) @@ -245,8 +242,7 @@ public class RabbitMqIOTest implements Serializable { public void testWriteExchange() throws Exception { final int maxNumRecords = 1000; List<RabbitMqMessage> data = - generateRecords(maxNumRecords) - .stream() + generateRecords(maxNumRecords).stream() .map(bytes -> new RabbitMqMessage(bytes)) .collect(Collectors.toList()); p.apply(Create.of(data)) diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedSource.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedSource.java index b707f6a..eae04bc 100644 --- a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedSource.java +++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedSource.java @@ -143,8 +143,7 @@ public class SyntheticBoundedSource extends OffsetBasedSource<KV<byte[], byte[]> : sourceOptions.forceNumInitialBundles; List<SyntheticBoundedSource> res = - bundleSplitter - .getBundleSizes(desiredNumBundles, this.getStartOffset(), this.getEndOffset()) + bundleSplitter.getBundleSizes(desiredNumBundles, this.getStartOffset(), this.getEndOffset()) .stream() .map(offsetRange -> createSourceForSubrange(offsetRange.getFrom(), offsetRange.getTo())) .collect(Collectors.toList()); diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUnboundedSource.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUnboundedSource.java index a76b493..f9b4db2 100644 --- a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUnboundedSource.java +++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUnboundedSource.java @@ -99,9 +99,7 @@ public class SyntheticUnboundedSource : desiredNumSplits; List<SyntheticUnboundedSource> splits = - bundleSplitter - .getBundleSizes(desiredNumBundles, startOffset, endOffset) - .stream() + bundleSplitter.getBundleSizes(desiredNumBundles, startOffset, endOffset).stream() .map( offsetRange -> new SyntheticUnboundedSource( diff --git a/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/BundleSplitterTest.java b/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/BundleSplitterTest.java index a572661..1a30ee4 100644 --- a/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/BundleSplitterTest.java +++ b/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/BundleSplitterTest.java @@ -64,8 +64,7 @@ public class BundleSplitterTest { List<OffsetRange> bundleSizes = splitter.getBundleSizes(4, 0, options.numRecords); - bundleSizes - .stream() + bundleSizes.stream() .map(range -> range.getTo() - range.getFrom()) .forEach(size -> assertEquals(expectedBundleSize, size.intValue())); } @@ -79,8 +78,7 @@ public class BundleSplitterTest { List<OffsetRange> bundleSizes = splitter.getBundleSizes(4, 0, options.numRecords); - bundleSizes - .stream() + bundleSizes.stream() .map(range -> range.getTo() - range.getFrom()) .forEach(size -> assertEquals(expectedBundleSize, size.intValue())); } diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index 3c0309d..0075190 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -222,7 +222,7 @@ public class TikaIO { } Metadata metadata = getInputMetadata(); if (metadata != null) { - //TODO: use metadata.toString() only without a trim() once Apache Tika 1.17 gets released + // TODO: use metadata.toString() only without a trim() once Apache Tika 1.17 gets released builder.add( DisplayData.item("inputMetadata", metadata.toString().trim()) .withLabel("Input Metadata")); diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java index a6f5505..1d0e019 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java @@ -715,9 +715,7 @@ public class NexmarkUtils { break; case CSV: FileSystems.delete( - FileSystems.match(config.sideInputUrl + "*") - .metadata() - .stream() + FileSystems.match(config.sideInputUrl + "*").metadata().stream() .map(metadata -> metadata.resourceId()) .collect(Collectors.toList())); break; diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java index c5a3395..0c37e15 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java @@ -142,9 +142,7 @@ public class PubsubHelper { /** Does topic corresponding to short name exist? */ public boolean topicExists(String shortTopic) throws IOException { TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic); - return pubsubClient - .listTopics(PubsubClient.projectPathFromId(project)) - .stream() + return pubsubClient.listTopics(PubsubClient.projectPathFromId(project)).stream() .anyMatch(topic::equals); } @@ -199,9 +197,7 @@ public class PubsubHelper { TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic); SubscriptionPath subscription = PubsubClient.subscriptionPathFromName(project, shortSubscription); - return pubsubClient - .listSubscriptions(PubsubClient.projectPathFromId(project), topic) - .stream() + return pubsubClient.listSubscriptions(PubsubClient.projectPathFromId(project), topic).stream() .anyMatch(subscription::equals); } diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java index a4ded8d..89b0cc6 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java @@ -118,7 +118,7 @@ public class Query10 extends NexmarkQueryTransform<Done> { /** Return channel for writing bytes to GCS. */ private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename) throws IOException { - //TODO + // TODO // Fix after PR: right now this is a specific Google added use case // Discuss it on ML: shall we keep GCS or use HDFS or use a generic beam filesystem way. throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory"); diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java index f353087..05d7bf3 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java @@ -247,7 +247,7 @@ public class Query3 extends NexmarkQueryTransform<NameCityStateId> { } // Remember this person for any future auctions. personState.write(newPerson); - //set a time out to clear this state + // set a time out to clear this state Instant firingTime = new Instant(newPerson.dateTime).plus(Duration.standardSeconds(maxAuctionsWaitingTime)); timer.set(firingTime); diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.java index 688d4dc..b327141 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.java @@ -101,16 +101,14 @@ public class SessionSideInputJoinModel extends NexmarkQueryModel<Bid> { Instant sessionStart = Ordering.<Instant>natural() .min( - session - .stream() + session.stream() .<Instant>map(tsv -> tsv.getTimestamp()) .collect(Collectors.toList())); Instant sessionEnd = Ordering.<Instant>natural() .max( - session - .stream() + session.stream() .<Instant>map(tsv -> tsv.getTimestamp()) .collect(Collectors.toList())) .plus(configuration.sessionGap); diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java index 42fd23d..396406a 100644 --- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java +++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java @@ -79,9 +79,7 @@ public class BigQueryClient { if (client.getTable(tableId, FIELD_OPTIONS) == null) { List<Field> schemaFields = - schema - .entrySet() - .stream() + schema.entrySet().stream() .map(entry -> Field.of(entry.getKey(), LegacySQLTypeName.valueOf(entry.getValue()))) .collect(Collectors.toList()); diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java index e7203e3..7bee2f1 100644 --- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java +++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java @@ -50,10 +50,7 @@ public class BigQueryResultsPublisher { } private Map<String, Object> getRowOfSchema(TestResult result) { - return result - .toMap() - .entrySet() - .stream() + return result.toMap().entrySet().stream() .filter(element -> schema.containsKey(element.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); }