This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a96277090358d2237aab7426ebde7fdf1eaccbe4 Author: Niels Basjes <nbas...@bol.com> AuthorDate: Wed Aug 15 14:05:39 2018 +0200 [FLINK-9311] [pubsub] Add unit and integration tests for PubSub connectors --- flink-connectors/flink-connector-pubsub/pom.xml | 86 +++---- .../flink/streaming/connectors/pubsub/Bound.java | 17 ++ .../connectors/pubsub/BoundedPubSubSource.java | 33 ++- .../streaming/connectors/pubsub/PubSubSink.java | 141 +++++------ .../streaming/connectors/pubsub/PubSubSource.java | 40 ++-- .../connectors/pubsub/SubscriberWrapper.java | 22 +- .../common/SerializableCredentialsProvider.java | 11 +- .../streaming/connectors/pubsub/BoundTest.java | 20 +- .../connectors/pubsub/BoundedPubSubSourceTest.java | 25 +- .../connectors/pubsub/PubSubSourceTest.java | 5 +- .../src/test/resources/log4j-test.properties | 24 ++ .../flink-connector-pubsub-emulator-tests}/pom.xml | 112 +++++---- .../connectors/pubsub/CheckPubSubEmulatorTest.java | 115 +++++++++ .../connectors/pubsub/EmulatedPubSubSinkTest.java | 109 +++++++++ .../pubsub/EmulatedPubSubSourceTest.java | 116 +++++++++ .../pubsub/emulator/GCloudEmulatorManager.java | 264 +++++++++++++++++++++ .../pubsub/emulator/GCloudUnitTestBase.java | 84 +++++++ .../connectors/pubsub/emulator/PubsubHelper.java | 232 ++++++++++++++++++ .../src/test/resources/log4j-test.properties | 24 ++ flink-end-to-end-tests/pom.xml | 1 + flink-end-to-end-tests/run-nightly-tests.sh | 2 + .../test-scripts/test_streaming_pubsub.sh | 22 ++ .../streaming/examples/pubsub/PubSubExample.java | 7 +- 23 files changed, 1309 insertions(+), 203 deletions(-) diff --git a/flink-connectors/flink-connector-pubsub/pom.xml b/flink-connectors/flink-connector-pubsub/pom.xml index a6e8d72..f50cbdd 100644 --- a/flink-connectors/flink-connector-pubsub/pom.xml +++ b/flink-connectors/flink-connector-pubsub/pom.xml @@ -35,12 +35,20 @@ under the License. <packaging>jar</packaging> - <properties> - <pubsub.version>1.37.1</pubsub.version> - </properties> + <!-- This is the way we get a consistent set of versions of the Google tools --> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.google.cloud</groupId> + <artifactId>google-cloud-bom</artifactId> + <version>0.53.0-alpha</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> <dependencies> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> @@ -51,7 +59,29 @@ under the License. <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-pubsub</artifactId> - <version>${pubsub.version}</version> + <!-- Version is pulled from google-cloud-bom --> + <exclusions> + <!-- Exclude an old version of guava that is being pulled + in by a transitive dependency of google-api-client --> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava-jdk5</artifactId> + </exclusion> + </exclusions> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + <scope>test</scope> </dependency> <dependency> @@ -69,51 +99,5 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <id>shade-flink</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <shadeTestJar>false</shadeTestJar> - <artifactSet> - <includes> - <include>*:*</include> - </includes> - </artifactSet> - <relocations> - <relocation> - <pattern>com.google.guava</pattern> - <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.guava</shadedPattern> - </relocation> - <relocation> - <pattern>com.google.common.base</pattern> - <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.common.base</shadedPattern> - </relocation> - <relocation> - <pattern>io.grpc.auth</pattern> - <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.auth</shadedPattern> - </relocation> - <relocation> - <pattern>io.grpc.protobuf</pattern> - <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.protobuf</shadedPattern> - </relocation> - </relocations> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - </project> diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java index b37cc45..727f32e 100644 --- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java +++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.streaming.connectors.pubsub; import org.apache.flink.streaming.api.functions.source.SourceFunction; diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java index 5f829ae..83fc15e 100644 --- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java +++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.streaming.connectors.pubsub; import com.google.cloud.pubsub.v1.AckReplyConsumer; @@ -5,7 +22,11 @@ import com.google.pubsub.v1.PubsubMessage; import java.io.IOException; -class BoundedPubSubSource<OUT> extends PubSubSource<OUT> { +/** + * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop at some point. For example after a period of idle or and after n amount of messages have been received. + * + */ +public class BoundedPubSubSource<OUT> extends PubSubSource<OUT> { private Bound<OUT> bound; private BoundedPubSubSource() { @@ -28,11 +49,19 @@ class BoundedPubSubSource<OUT> extends PubSubSource<OUT> { bound.receivedMessage(); } + /** + * Creates a {@link BoundedPubSubSourceBuilder}. + * @param <OUT> Type of Object which will be read by the produced {@link BoundedPubSubSource} + */ @SuppressWarnings("unchecked") public static <OUT> BoundedPubSubSourceBuilder<OUT, ? extends PubSubSource, ? extends BoundedPubSubSourceBuilder> newBuilder() { return new BoundedPubSubSourceBuilder<>(new BoundedPubSubSource<OUT>()); } + /** + * Builder to create BoundedPubSubSource. + * @param <OUT> Type of Object which will be read by the BoundedPubSubSource + */ @SuppressWarnings("unchecked") public static class BoundedPubSubSourceBuilder<OUT, PSS extends BoundedPubSubSource<OUT>, BUILDER extends BoundedPubSubSourceBuilder<OUT, PSS, BUILDER>> extends PubSubSourceBuilder<OUT, PSS, BUILDER> { private Long boundedByAmountOfMessages; @@ -52,7 +81,7 @@ class BoundedPubSubSource<OUT> extends PubSubSource<OUT> { return (BUILDER) this; } - private Bound <OUT> createBound() { + private Bound<OUT> createBound() { if (boundedByAmountOfMessages != null && boundedByTimeSinceLastMessage != null) { return Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(boundedByAmountOfMessages, boundedByTimeSinceLastMessage); } diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java index 92c9c6c..e6ac53e 100644 --- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java +++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.connectors.pubsub; -import com.google.api.gax.core.NoCredentialsProvider; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -40,22 +39,35 @@ import java.io.IOException; /** * A sink function that outputs to PubSub. + * * @param <IN> type of PubSubSink messages to write */ public class PubSubSink<IN> extends RichSinkFunction<IN> { - private final SerializableCredentialsProvider serializableCredentialsProvider; - private final SerializationSchema<IN> serializationSchema; - private final String projectName; - private final String topicName; - private String hostAndPort = null; + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema<IN> serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; private transient Publisher publisher; - public PubSubSink(SerializableCredentialsProvider serializableCredentialsProvider, SerializationSchema<IN> serializationSchema, String projectName, String topicName) { + private PubSubSink() { + } + + void setSerializableCredentialsProvider(SerializableCredentialsProvider serializableCredentialsProvider) { this.serializableCredentialsProvider = serializableCredentialsProvider; + } + + void setSerializationSchema(SerializationSchema<IN> serializationSchema) { this.serializationSchema = serializationSchema; + } + + void setProjectName(String projectName) { this.projectName = projectName; + } + + void setTopicName(String topicName) { this.topicName = topicName; } @@ -64,15 +76,29 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> { * The ONLY reason to use this is during tests with the emulator provided by Google. * * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234") - * @return The current instance */ - public PubSubSink<IN> withHostAndPort(String hostAndPort) { + void withHostAndPort(String hostAndPort) { this.hostAndPort = hostAndPort; - return this; } - private transient ManagedChannel managedChannel = null; - private transient TransportChannel channel = null; + void initialize() throws IOException { + if (serializableCredentialsProvider == null) { + serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables(); + } + if (serializationSchema == null) { + throw new IllegalArgumentException("The serializationSchema has not been specified."); + } + if (projectName == null) { + throw new IllegalArgumentException("The projectName has not been specified."); + } + if (topicName == null) { + throw new IllegalArgumentException("The topicName has not been specified."); + } + } + + + private transient ManagedChannel managedChannel = null; + private transient TransportChannel channel = null; @Override public void open(Configuration configuration) throws Exception { @@ -114,127 +140,110 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> { /** * Create a builder for a new PubSubSink. + * * @param <IN> The generic of the type that is to be written into the sink. * @return a new PubSubSinkBuilder instance */ - public static <IN> PubSubSinkBuilder<IN> newBuilder() { - return new PubSubSinkBuilder<>(); + public static <IN> PubSubSinkBuilder<IN, ? extends PubSubSink<IN>, ? extends PubSubSinkBuilder<IN, ?, ?>> newBuilder() { + return new PubSubSinkBuilder<>(new PubSubSink<>()); } /** * PubSubSinkBuilder to create a PubSubSink. + * * @param <IN> Type of PubSubSink to create. */ - public static class PubSubSinkBuilder<IN> { - private SerializableCredentialsProvider serializableCredentialsProvider = null; - private SerializationSchema<IN> serializationSchema = null; - private String projectName = null; - private String topicName = null; - private String hostAndPort = null; - - private PubSubSinkBuilder() { + @SuppressWarnings("unchecked") + public static class PubSubSinkBuilder<IN, PSS extends PubSubSink<IN>, BUILDER extends PubSubSinkBuilder<IN, PSS, BUILDER>> { + protected PSS sinkUnderConstruction; + + private PubSubSinkBuilder(PSS sinkUnderConstruction) { + this.sinkUnderConstruction = sinkUnderConstruction; } /** * Set the credentials. * If this is not used then the credentials are picked up from the environment variables. + * * @param credentials the Credentials needed to connect. * @return The current PubSubSinkBuilder instance */ - public PubSubSinkBuilder<IN> withCredentials(Credentials credentials) { - this.serializableCredentialsProvider = new SerializableCredentialsProvider(credentials); - return this; + public BUILDER withCredentials(Credentials credentials) { + sinkUnderConstruction.setSerializableCredentialsProvider(new SerializableCredentialsProvider(credentials)); + return (BUILDER) this; } /** * Set the CredentialsProvider. * If this is not used then the credentials are picked up from the environment variables. + * * @param credentialsProvider the custom SerializableCredentialsProvider instance. * @return The current PubSubSinkBuilder instance */ - public PubSubSinkBuilder<IN> withCredentialsProvider(CredentialsProvider credentialsProvider) throws IOException { + public BUILDER withCredentialsProvider(CredentialsProvider credentialsProvider) throws IOException { return withCredentials(credentialsProvider.getCredentials()); } /** * Set the credentials to be absent. * This means that no credentials are to be used at all. + * * @return The current PubSubSinkBuilder instance */ - public PubSubSinkBuilder<IN> withoutCredentials() { - this.serializableCredentialsProvider = SerializableCredentialsProvider.withoutCredentials(); - return this; + public BUILDER withoutCredentials() { + sinkUnderConstruction.setSerializableCredentialsProvider(SerializableCredentialsProvider.withoutCredentials()); + return (BUILDER) this; } /** * @param serializationSchema Instance of a SerializationSchema that converts the IN into a byte[] * @return The current PubSubSinkBuilder instance */ - public PubSubSinkBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema) { - this.serializationSchema = serializationSchema; - return this; + public BUILDER withSerializationSchema(SerializationSchema<IN> serializationSchema) { + sinkUnderConstruction.setSerializationSchema(serializationSchema); + return (BUILDER) this; } /** * @param projectName The name of the project in PubSub * @return The current PubSubSinkBuilder instance */ - public PubSubSinkBuilder<IN> withProjectName (String projectName) { - this.projectName = projectName; - return this; + public BUILDER withProjectName(String projectName) { + sinkUnderConstruction.setProjectName(projectName); + return (BUILDER) this; } /** * @param topicName The name of the topic in PubSub * @return The current PubSubSinkBuilder instance */ - public PubSubSinkBuilder<IN> withTopicName (String topicName) { - this.topicName = topicName; - return this; + public BUILDER withTopicName(String topicName) { + sinkUnderConstruction.setTopicName(topicName); + return (BUILDER) this; } - /** * Set the custom hostname/port combination of PubSub. * The ONLY reason to use this is during tests with the emulator provided by Google. + * * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234") * @return The current PubSubSinkBuilder instance */ - public PubSubSinkBuilder<IN> withHostAndPort(String hostAndPort) { - this.hostAndPort = hostAndPort; - return this; + public BUILDER withHostAndPort(String hostAndPort) { + sinkUnderConstruction.withHostAndPort(hostAndPort); + return (BUILDER) this; } /** * Actually builder the desired instance of the PubSubSink. + * * @return a brand new PubSubSink - * @throws IOException incase of a problem getting the credentials + * @throws IOException incase of a problem getting the credentials * @throws IllegalArgumentException incase required fields were not specified. */ public PubSubSink<IN> build() throws IOException { - if (serializableCredentialsProvider == null) { - serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables(); - } - if (serializationSchema == null) { - throw new IllegalArgumentException("The serializationSchema has not been specified."); - } - if (projectName == null) { - throw new IllegalArgumentException("The projectName has not been specified."); - } - if (topicName == null) { - throw new IllegalArgumentException("The topicName has not been specified."); - } - - PubSubSink<IN> pubSubSink = new PubSubSink<>( - serializableCredentialsProvider, - serializationSchema, - projectName, topicName); - - if (hostAndPort != null) { - pubSubSink.withHostAndPort(hostAndPort); - } - - return pubSubSink; + sinkUnderConstruction.initialize(); + return sinkUnderConstruction; } } diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java index 8f8c689..2d93998 100644 --- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java +++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java @@ -43,7 +43,7 @@ import java.util.List; */ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, AckReplyConsumer> implements MessageReceiver, ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT> { private DeserializationSchema<OUT> deserializationSchema; - private SubscriberWrapper subscriberWrapper; + private SubscriberWrapper subscriberWrapper; protected transient SourceContext<OUT> sourceContext = null; @@ -64,7 +64,8 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase super.open(configuration); subscriberWrapper.initialize(this); if (hasNoCheckpointingEnabled(getRuntimeContext())) { - throw new IllegalArgumentException("Checkpointing needs to be enabled to support: PubSub ATLEAST_ONCE"); + throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and " + + "the checkpointing frequency must be MUCH lower than the PubSub timeout for it to retry a message."); } } @@ -123,26 +124,27 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase return deserializationSchema.getProducedType(); } - @SuppressWarnings("unchecked") - public static <OUT> PubSubSourceBuilder<OUT, ? extends PubSubSource, ? extends PubSubSourceBuilder> newBuilder() { - return new PubSubSourceBuilder<>(new PubSubSource<OUT>()); + public static <OUT> PubSubSourceBuilder<OUT, ? extends PubSubSource<OUT>, ? extends PubSubSourceBuilder<OUT, ?, ?>> newBuilder() { + return new PubSubSourceBuilder<>(new PubSubSource<>()); } /** * Builder to create PubSubSource. - * @param <OUT> The type of objects which will be read - * @param <PSS> The type of PubSubSource + * + * @param <OUT> The type of objects which will be read + * @param <PSS> The type of PubSubSource * @param <BUILDER> The type of Builder to create the PubSubSource */ + @SuppressWarnings("unchecked") public static class PubSubSourceBuilder<OUT, PSS extends PubSubSource<OUT>, BUILDER extends PubSubSourceBuilder<OUT, PSS, BUILDER>> { - protected PSS sourceUnderConstruction; + protected PSS sourceUnderConstruction; - private SubscriberWrapper subscriberWrapper = null; + private SubscriberWrapper subscriberWrapper = null; private SerializableCredentialsProvider serializableCredentialsProvider; - private DeserializationSchema<OUT> deserializationSchema; - private String projectName; - private String subscriptionName; - private String hostAndPort; + private DeserializationSchema<OUT> deserializationSchema; + private String projectName; + private String subscriptionName; + private String hostAndPort; protected PubSubSourceBuilder(PSS sourceUnderConstruction) { this.sourceUnderConstruction = sourceUnderConstruction; @@ -151,6 +153,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase /** * Set the credentials. * If this is not used then the credentials are picked up from the environment variables. + * * @param credentials the Credentials needed to connect. * @return The current PubSubSourceBuilder instance */ @@ -162,6 +165,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase /** * Set the CredentialsProvider. * If this is not used then the credentials are picked up from the environment variables. + * * @param credentialsProvider the custom SerializableCredentialsProvider instance. * @return The current PubSubSourceBuilder instance */ @@ -172,6 +176,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase /** * Set the credentials to be absent. * This means that no credentials are to be used at all. + * * @return The current PubSubSourceBuilder instance */ public BUILDER withoutCredentials() { @@ -183,13 +188,13 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase * @param deserializationSchema Instance of a DeserializationSchema that converts the OUT into a byte[] * @return The current PubSubSourceBuilder instance */ - public BUILDER withDeserializationSchema(DeserializationSchema <OUT> deserializationSchema) { + public BUILDER withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) { this.deserializationSchema = deserializationSchema; return (BUILDER) this; } /** - * @param projectName The name of the project in GoogleCloudPlatform + * @param projectName The name of the project in GoogleCloudPlatform * @param subscriptionName The name of the subscription in PubSub * @return The current PubSubSourceBuilder instance */ @@ -202,6 +207,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase /** * Set the custom hostname/port combination of PubSub. * The ONLY reason to use this is during tests with the emulator provided by Google. + * * @param hostAndPort The combination of hostname and port to connect to ("hostname:1234") * @return The current PubSubSourceBuilder instance */ @@ -213,6 +219,7 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase /** * Set a complete SubscriberWrapper. * The ONLY reason to use this is during tests. + * * @param subscriberWrapper The fully instantiated SubscriberWrapper * @return The current PubSubSourceBuilder instance */ @@ -223,8 +230,9 @@ public class PubSubSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase /** * Actually build the desired instance of the PubSubSourceBuilder. + * * @return a brand new SourceFunction - * @throws IOException incase of a problem getting the credentials + * @throws IOException incase of a problem getting the credentials * @throws IllegalArgumentException incase required fields were not specified. */ public PSS build() throws IOException { diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java index fb75f43..0595877 100644 --- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java +++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java @@ -33,13 +33,13 @@ import java.io.Serializable; class SubscriberWrapper implements Serializable { private final SerializableCredentialsProvider serializableCredentialsProvider; - private final String projectId; - private final String subscriptionId; - private String hostAndPort = null; + private final String projectId; + private final String subscriptionId; + private String hostAndPort = null; - private transient Subscriber subscriber; - private transient ManagedChannel managedChannel = null; - private transient TransportChannel channel = null; + private transient Subscriber subscriber; + private transient ManagedChannel managedChannel = null; + private transient TransportChannel channel = null; SubscriberWrapper(SerializableCredentialsProvider serializableCredentialsProvider, ProjectSubscriptionName projectSubscriptionName) { this.serializableCredentialsProvider = serializableCredentialsProvider; @@ -49,14 +49,14 @@ class SubscriberWrapper implements Serializable { void initialize(MessageReceiver messageReceiver) { Subscriber.Builder builder = Subscriber - .newBuilder(ProjectSubscriptionName.of(projectId, subscriptionId), messageReceiver) - .setCredentialsProvider(serializableCredentialsProvider); + .newBuilder(ProjectSubscriptionName.of(projectId, subscriptionId), messageReceiver) + .setCredentialsProvider(serializableCredentialsProvider); if (hostAndPort != null) { managedChannel = ManagedChannelBuilder - .forTarget(hostAndPort) - .usePlaintext(true) // This is 'Ok' because this is ONLY used for testing. - .build(); + .forTarget(hostAndPort) + .usePlaintext(true) // This is 'Ok' because this is ONLY used for testing. + .build(); channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build(); builder.setChannelProvider(FixedTransportChannelProvider.create(channel)); } diff --git a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java index bd04058..44b1fa0 100644 --- a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java +++ b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java @@ -40,7 +40,9 @@ public class SerializableCredentialsProvider implements CredentialsProvider, Ser } /** - * Creates a SerializableCredentialsProvider for a PubSubSubscription based on environment variables. {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings} + * Creates a SerializableCredentialsProvider for a PubSubSubscription based on environment variables. + * {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings} + * * @return serializableCredentialsProvider * @throws IOException thrown by {@link Credentials} */ @@ -50,11 +52,12 @@ public class SerializableCredentialsProvider implements CredentialsProvider, Ser } /** - * Creates a SerializableCredentialsProvider for a PubSubSubscription without any credentials. {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings} + * Creates a SerializableCredentialsProvider for a PubSubSubscription without any credentials. + * {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings} * This is ONLY useful when running tests locally against Mockito or the Google PubSub emulator - * @see <a href="https://cloud.google.com/pubsub/docs/emulator" target="_top">https://cloud.google.com/pubsub/docs/emulator</a>. + * @see <a href="https://cloud.google.com/pubsub/docs/emulator" target="_top">https://cloud.google.com/pubsub/docs/emulator</a> * @return serializableCredentialsProvider - * @throws IOException thrown by {@link Credentials} + * @see <a href="https://cloud.google.com/pubsub/docs/emulator" target="_top">https://cloud.google.com/pubsub/docs/emulator</a> */ public static SerializableCredentialsProvider withoutCredentials() { return new SerializableCredentialsProvider(NoCredentials.getInstance()); diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java index f98731b..a340ae9 100644 --- a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java +++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.streaming.connectors.pubsub; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -5,9 +22,9 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.junit.Test; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.internal.verification.VerificationModeFactory.times; /** * Test for {@link Bound}. @@ -107,6 +124,7 @@ public class BoundTest { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { + // Ignore any exceptions } } } diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java index 805f823..5f938fd 100644 --- a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java +++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.streaming.connectors.pubsub; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -13,9 +30,9 @@ import org.junit.Test; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; /** * Tests for {@link BoundedPubSubSource}. @@ -63,8 +80,8 @@ public class BoundedPubSubSourceTest { private PubsubMessage pubSubMessage() { return PubsubMessage.newBuilder() - .setMessageId("message-id") - .setData(ByteString.copyFrom("some-message".getBytes())) - .build(); + .setMessageId("message-id") + .setData(ByteString.copyFrom("some-message".getBytes())) + .build(); } } diff --git a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java index 73ca53b..9db5d7d 100644 --- a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java +++ b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java @@ -42,7 +42,6 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.times; - /** * Test for {@link SourceFunction}. */ @@ -142,7 +141,7 @@ public class PubSubSourceTest { private PubsubMessage pubSubMessage() { return PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(SERIALIZED_MESSAGE)) - .build(); + .setData(ByteString.copyFrom(SERIALIZED_MESSAGE)) + .build(); } } diff --git a/flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..b316a9a --- /dev/null +++ b/flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties @@ -0,0 +1,24 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +log4j.rootLogger=INFO, testlogger +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target=System.out +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger diff --git a/flink-connectors/flink-connector-pubsub/pom.xml b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml similarity index 50% copy from flink-connectors/flink-connector-pubsub/pom.xml copy to flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml index a6e8d72..7dd0d15 100644 --- a/flink-connectors/flink-connector-pubsub/pom.xml +++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml @@ -25,33 +25,75 @@ under the License. <parent> <groupId>org.apache.flink</groupId> - <artifactId>flink-connectors</artifactId> + <artifactId>flink-end-to-end-tests</artifactId> <version>1.7-SNAPSHOT</version> <relativePath>..</relativePath> </parent> - <artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId> - <name>flink-connector-pubsub</name> + <artifactId>flink-connector-pubsub-emulator-tests</artifactId> + <name>flink-connector-pubsub-emulator-tests</name> <packaging>jar</packaging> - <properties> - <pubsub.version>1.37.1</pubsub.version> - </properties> + <!-- This is the way we get a consistent set of versions of the Google tools --> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.google.cloud</groupId> + <artifactId>google-cloud-bom</artifactId> + <version>0.53.0-alpha</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> <dependencies> + <!--All dependencies are <scope>test</scope> --> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${project.version}</version> - <scope>provided</scope> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> </dependency> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-pubsub</artifactId> - <version>${pubsub.version}</version> + <!-- Version is pulled from google-cloud-bom --> + <exclusions> + <!-- Exclude an old version of guava that is being pulled + in by a transitive dependency of google-api-client --> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava-jdk5</artifactId> + </exclusion> + </exclusions> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + <scope>test</scope> + </dependency> + + <!-- This is used to run the local PubSub --> + <dependency> + <groupId>com.spotify</groupId> + <artifactId>docker-client</artifactId> + <version>8.11.7</version> + <scope>test</scope> </dependency> <dependency> @@ -69,51 +111,39 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> - </dependencies> - <build> - <plugins> + <!-- ONLY run the tests when explicitly told to do so --> + <properties> + <skipTests>true</skipTests> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.12.4</version> + <configuration> + <skipTests>${skipTests}</skipTests> + </configuration> + </plugin> + <!-- Disabling convergence check because there are multiple problems within the used pubsub dependencies --> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> + <artifactId>maven-enforcer-plugin</artifactId> <executions> <execution> - <id>shade-flink</id> - <phase>package</phase> + <id>dependency-convergence</id> <goals> - <goal>shade</goal> + <goal>enforce</goal> </goals> <configuration> - <shadeTestJar>false</shadeTestJar> - <artifactSet> - <includes> - <include>*:*</include> - </includes> - </artifactSet> - <relocations> - <relocation> - <pattern>com.google.guava</pattern> - <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.guava</shadedPattern> - </relocation> - <relocation> - <pattern>com.google.common.base</pattern> - <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.common.base</shadedPattern> - </relocation> - <relocation> - <pattern>io.grpc.auth</pattern> - <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.auth</shadedPattern> - </relocation> - <relocation> - <pattern>io.grpc.protobuf</pattern> - <shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.protobuf</shadedPattern> - </relocation> - </relocations> + <skip>true</skip> </configuration> </execution> </executions> </plugin> - </plugins> - </build> + </plugins> + </build> </project> diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/CheckPubSubEmulatorTest.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/CheckPubSubEmulatorTest.java new file mode 100644 index 0000000..a54d47b --- /dev/null +++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/CheckPubSubEmulatorTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.connectors.pubsub.emulator.GCloudUnitTestBase; +import org.apache.flink.streaming.connectors.pubsub.emulator.PubsubHelper; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.ReceivedMessage; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.Assert.assertEquals; + +/** + * Tests to ensure the docker image with PubSub is working correctly. + */ +public class CheckPubSubEmulatorTest extends GCloudUnitTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(CheckPubSubEmulatorTest.class); + + private static final String PROJECT_NAME = "Project"; + private static final String TOPIC_NAME = "Topic"; + private static final String SUBSCRIPTION_NAME = "Subscription"; + + private static PubsubHelper pubsubHelper = getPubsubHelper(); + + @BeforeClass + public static void setUp() throws Exception { + pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME); + pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME); + pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME); + } + + @Test + public void testPull() throws Exception { + Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME); + publisher + .publish(PubsubMessage + .newBuilder() + .setData(ByteString.copyFromUtf8("Hello World PULL")) + .build()) + .get(); + + List<ReceivedMessage> receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 10); + assertEquals(1, receivedMessages.size()); + assertEquals("Hello World PULL", receivedMessages.get(0).getMessage().getData().toStringUtf8()); + + publisher.shutdown(); + } + + @Test + public void testPush() throws Exception { + List<PubsubMessage> receivedMessages = new ArrayList<>(); + Subscriber subscriber = pubsubHelper. + subscribeToSubscription( + PROJECT_NAME, + SUBSCRIPTION_NAME, + (message, consumer) -> receivedMessages.add(message) + ); + + Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME); + publisher + .publish(PubsubMessage + .newBuilder() + .setData(ByteString.copyFromUtf8("Hello World")) + .build()) + .get(); + + LOG.info("Waiting a while to receive the message..."); + Thread.sleep(1000); + + assertEquals(1, receivedMessages.size()); + assertEquals("Hello World", receivedMessages.get(0).getData().toStringUtf8()); + + try { + subscriber.stopAsync().awaitTerminated(100, MILLISECONDS); + } catch (TimeoutException tme) { + // Yeah, whatever. Don't care about clean shutdown here. + } + publisher.shutdown(); + } + +} diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSinkTest.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSinkTest.java new file mode 100644 index 0000000..165579f --- /dev/null +++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSinkTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.pubsub.emulator.GCloudUnitTestBase; +import org.apache.flink.streaming.connectors.pubsub.emulator.PubsubHelper; + +import com.google.pubsub.v1.ReceivedMessage; +import org.apache.commons.lang3.StringUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test of the PubSub SINK with the Google PubSub emulator. + */ +public class EmulatedPubSubSinkTest extends GCloudUnitTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(EmulatedPubSubSinkTest.class); + + private static final String PROJECT_NAME = "FLProject"; + private static final String TOPIC_NAME = "FLTopic"; + private static final String SUBSCRIPTION_NAME = "FLSubscription"; + + private static PubsubHelper pubsubHelper; + + @BeforeClass + public static void setUp() throws Exception { + pubsubHelper = getPubsubHelper(); + pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME); + pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME); + pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME); + } + + @Test + public void testFlinkSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + List<String> input = Arrays.asList("One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eigth", "Nine", "Ten"); + + // Create test stream + DataStream<String> theData = env + .fromCollection(input) + .name("Test input") + .map((MapFunction<String, String>) StringUtils::reverse); + + // Sink into pubsub + theData + .addSink(PubSubSink.<String>newBuilder() + .withProjectName(PROJECT_NAME) + .withTopicName(TOPIC_NAME) + .withSerializationSchema(new SimpleStringSchema()) + // Specific for emulator + .withCredentialsProvider(getPubsubHelper().getCredentialsProvider()) + .withHostAndPort(getPubSubHostPort()) + .build()) + .name("PubSub sink"); + + // Run + env.execute(); + + // Now get the result from PubSub and verify if everything is there + List<ReceivedMessage> receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 100); + + assertEquals("Wrong number of elements", input.size(), receivedMessages.size()); + + // Check output strings + List<String> output = new ArrayList<>(); + receivedMessages.forEach(msg -> output.add(msg.getMessage().getData().toStringUtf8())); + + for (String test : input) { + assertTrue("Missing " + test, output.contains(StringUtils.reverse(test))); + } + } + +} diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSourceTest.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSourceTest.java new file mode 100644 index 0000000..aaa4fc3 --- /dev/null +++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSourceTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.pubsub.emulator.GCloudUnitTestBase; +import org.apache.flink.streaming.connectors.pubsub.emulator.PubsubHelper; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test of the PubSub SOURCE with the Google PubSub emulator. + */ +public class EmulatedPubSubSourceTest extends GCloudUnitTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(EmulatedPubSubSourceTest.class); + + private static final String PROJECT_NAME = "FLProject"; + private static final String TOPIC_NAME = "FLTopic"; + private static final String SUBSCRIPTION_NAME = "FLSubscription"; + + private static PubsubHelper pubsubHelper; + + @BeforeClass + public static void setUp() throws Exception { + pubsubHelper = getPubsubHelper(); + pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME); + pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + pubsubHelper.deleteSubscription(PROJECT_NAME, SUBSCRIPTION_NAME); + pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME); + } + + @Test + public void testFlinkSource() throws Exception { + // Create some messages and put them into pubsub + List<String> input = Arrays.asList("One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eigth", "Nine", "Ten"); + + // Publish the messages into PubSub + Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME); + input.forEach(s -> { + try { + publisher + .publish(PubsubMessage + .newBuilder() + .setData(ByteString.copyFromUtf8(s)) + .build()) + .get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100); + + DataStream<String> fromPubSub = env + .addSource(BoundedPubSubSource.<String>newBuilder() + .withDeserializationSchema(new SimpleStringSchema()) + .withProjectSubscriptionName(PROJECT_NAME, SUBSCRIPTION_NAME) + // Specific for emulator + .withCredentialsProvider(getPubsubHelper().getCredentialsProvider()) + .withHostAndPort(getPubSubHostPort()) + // Make sure the test topology self terminates + .boundedByTimeSinceLastMessage(1000) + .build()) + .name("PubSub source"); + + List<String> output = new ArrayList<>(); + fromPubSub.writeUsingOutputFormat(new LocalCollectionOutputFormat<>(output)); + + env.execute(); + + assertEquals("Wrong number of elements", input.size(), output.size()); + for (String test : input) { + assertTrue("Missing " + test, output.contains(test)); + } + } + +} diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudEmulatorManager.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudEmulatorManager.java new file mode 100644 index 0000000..27a658a --- /dev/null +++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudEmulatorManager.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub.emulator; + +import com.spotify.docker.client.DefaultDockerClient; +import com.spotify.docker.client.DockerClient; +import com.spotify.docker.client.exceptions.ContainerNotFoundException; +import com.spotify.docker.client.exceptions.DockerCertificateException; +import com.spotify.docker.client.exceptions.DockerException; +import com.spotify.docker.client.exceptions.ImageNotFoundException; +import com.spotify.docker.client.messages.ContainerConfig; +import com.spotify.docker.client.messages.ContainerCreation; +import com.spotify.docker.client.messages.ContainerInfo; +import com.spotify.docker.client.messages.HostConfig; +import com.spotify.docker.client.messages.PortBinding; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * The class that handles the starting and stopping of the emulator docker image. + */ +public class GCloudEmulatorManager { + + private static final Logger LOG = LoggerFactory.getLogger(GCloudEmulatorManager.class); + + private static DockerClient docker; + + private static String dockerIpAddress = "127.0.0.1"; + + public static final String INTERNAL_PUBSUB_PORT = "22222"; + public static final String DOCKER_IMAGE_NAME = "google/cloud-sdk:latest"; + + private static String pubsubPort; + + public static String getDockerIpAddress() { + if (dockerIpAddress == null) { + throw new IllegalStateException("The docker has not yet been started (yet) so you cannot get the IP address yet."); + } + return dockerIpAddress; + } + + public static String getDockerPubSubPort() { + if (pubsubPort == null) { + throw new IllegalStateException("The docker has not yet been started (yet) so you cannot get the port information yet."); + } + return pubsubPort; + } + + public static final String UNITTEST_PROJECT_ID = "running-from-junit-for-flink"; + private static final String CONTAINER_NAME_JUNIT = (DOCKER_IMAGE_NAME + "_" + UNITTEST_PROJECT_ID).replaceAll("[^a-zA-Z0-9_]", "_"); + + public static void launchDocker() throws DockerException, InterruptedException, DockerCertificateException { + // Create a client based on DOCKER_HOST and DOCKER_CERT_PATH env vars + docker = DefaultDockerClient.fromEnv().build(); + + terminateAndDiscardAnyExistingContainers(true); + + LOG.info(""); + LOG.info("/==========================================="); + LOG.info("| GCloud Emulator"); + + ContainerInfo containerInfo; + String id; + + try { + docker.inspectImage(DOCKER_IMAGE_NAME); + } catch (ImageNotFoundException e) { + // No such image so we must download it first. + LOG.info("| - Getting docker image \"{}\"", DOCKER_IMAGE_NAME); + docker.pull(DOCKER_IMAGE_NAME, message -> { + if (message.id() != null && message.progress() != null) { + LOG.info("| - Downloading > {} : {}", message.id(), message.progress()); + } + }); + } + + // No such container. Good, we create one! + LOG.info("| - Creating new container"); + + // Bind container ports to host ports + final Map<String, List<PortBinding>> portBindings = new HashMap<>(); + portBindings.put(INTERNAL_PUBSUB_PORT, Collections.singletonList(PortBinding.randomPort("0.0.0.0"))); + + final HostConfig hostConfig = HostConfig.builder().portBindings(portBindings).build(); + + // Create new container with exposed ports + final ContainerConfig containerConfig = ContainerConfig.builder() + .hostConfig(hostConfig) + .exposedPorts(INTERNAL_PUBSUB_PORT) + .image(DOCKER_IMAGE_NAME) + .cmd("sh", "-c", "mkdir -p /opt/data/pubsub ; gcloud beta emulators pubsub start --data-dir=/opt/data/pubsub --host-port=0.0.0.0:" + INTERNAL_PUBSUB_PORT) + .build(); + + final ContainerCreation creation = docker.createContainer(containerConfig, CONTAINER_NAME_JUNIT); + id = creation.id(); + + containerInfo = docker.inspectContainer(id); + + if (!containerInfo.state().running()) { + LOG.warn("| - Starting it up ...."); + docker.startContainer(id); + Thread.sleep(1000); + } + + containerInfo = docker.inspectContainer(id); + + dockerIpAddress = "127.0.0.1"; + + Map<String, List<PortBinding>> ports = containerInfo.networkSettings().ports(); + + assertNotNull("Unable to retrieve the ports where to connect to the emulators", ports); + assertEquals("We expect 1 port to be mapped", 1, ports.size()); + + pubsubPort = getPort(ports, INTERNAL_PUBSUB_PORT, "PubSub"); + + LOG.info("| Waiting for the emulators to be running"); + + // PubSub exposes an "Ok" at the root url when running. + if (!waitForOkStatus("PubSub", pubsubPort)) { + // Oops, we did not get an "Ok" within 10 seconds + startHasFailedKillEverything(); + } + LOG.info("\\==========================================="); + LOG.info(""); + } + + private static void startHasFailedKillEverything() throws DockerException, InterruptedException { + LOG.error("|"); + LOG.error("| ==================== "); + LOG.error("| YOUR TESTS WILL FAIL "); + LOG.error("| ==================== "); + LOG.error("|"); + + // Kill this container and wipe all connection information + dockerIpAddress = null; + pubsubPort = null; + terminateAndDiscardAnyExistingContainers(false); + } + + private static final long MAX_RETRY_TIMEOUT = 10000; // Milliseconds + + private static boolean waitForOkStatus(String label, String port) { + long start = System.currentTimeMillis(); + while (true) { + try { + URL url = new URL("http://" + dockerIpAddress + ":" + port + "/"); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + con.setRequestMethod("GET"); + con.setConnectTimeout(50); + con.setReadTimeout(50); + + BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream())); + String inputLine; + StringBuilder content = new StringBuilder(); + while ((inputLine = in.readLine()) != null) { + content.append(inputLine); + } + in.close(); + con.disconnect(); + if (content.toString().contains("Ok")) { + LOG.info("| - {} Emulator is running at {}:{}", label, dockerIpAddress, port); + return true; + } + } catch (IOException e) { + long now = System.currentTimeMillis(); + if (now - start > MAX_RETRY_TIMEOUT) { + LOG.error("| - PubSub Emulator at {}:{} FAILED to return an Ok status within {} ms ", dockerIpAddress, port, MAX_RETRY_TIMEOUT); + return false; + } + try { + Thread.sleep(100); // Sleep a very short time + } catch (InterruptedException e1) { + // Ignore + } + } + } + } + + private static String getPort(Map<String, List<PortBinding>> ports, String internalTCPPort, String label) { + List<PortBinding> portMappings = ports.get(internalTCPPort + "/tcp"); + if (portMappings == null || portMappings.isEmpty()) { + LOG.info("| {} Emulator {} --> NOTHING CONNECTED TO {}", label, internalTCPPort + "/tcp"); + return null; + } + + return portMappings.get(0).hostPort(); + } + + private static void terminateAndDiscardAnyExistingContainers(boolean warnAboutExisting) throws DockerException, InterruptedException { + ContainerInfo containerInfo; + try { + containerInfo = docker.inspectContainer(CONTAINER_NAME_JUNIT); + // Already have this container running. + + assertNotNull("We should either we get containerInfo or we get an exception", containerInfo); + + LOG.info(""); + LOG.info("/==========================================="); + if (warnAboutExisting) { + LOG.warn("| >>> FOUND OLD EMULATOR INSTANCE RUNNING <<< "); + LOG.warn("| Destroying that one to keep tests running smoothly."); + } + LOG.info("| Cleanup of GCloud Emulator"); + + // We REQUIRE 100% accurate side effect free unit tests + // So we completely discard this one. + + String id = containerInfo.id(); + // Kill container + if (containerInfo.state().running()) { + docker.killContainer(id); + LOG.info("| - Killed"); + } + + // Remove container + docker.removeContainer(id); + + LOG.info("| - Removed"); + LOG.info("\\==========================================="); + LOG.info(""); + + } catch (ContainerNotFoundException cnfe) { + // No such container. Good ! + } + } + + public static void terminateDocker() throws DockerException, InterruptedException { + terminateAndDiscardAnyExistingContainers(false); + + // Close the docker client + docker.close(); + } + + // ==================================================================================== + +} diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudUnitTestBase.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudUnitTestBase.java new file mode 100644 index 0000000..b6a011a --- /dev/null +++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudUnitTestBase.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub.emulator; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.spotify.docker.client.exceptions.DockerException; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.Serializable; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.streaming.connectors.pubsub.emulator.GCloudEmulatorManager.getDockerIpAddress; +import static org.apache.flink.streaming.connectors.pubsub.emulator.GCloudEmulatorManager.getDockerPubSubPort; + +/** + * The base class from which unit tests should inherit if they need to use the Google cloud emulators. + */ +public class GCloudUnitTestBase implements Serializable { + @BeforeClass + public static void launchGCloudEmulator() throws Exception { + // Separated out into separate class so the entire test class to be serializable + GCloudEmulatorManager.launchDocker(); + } + + @AfterClass + public static void terminateGCloudEmulator() throws DockerException, InterruptedException { + GCloudEmulatorManager.terminateDocker(); + } + + // ==================================================================================== + // Pubsub helpers + + private static ManagedChannel channel = null; + private static TransportChannelProvider channelProvider = null; + private static CredentialsProvider credentialsProvider = null; + + public static PubsubHelper getPubsubHelper() { + if (channel == null) { + //noinspection deprecation + channel = ManagedChannelBuilder + .forTarget(getPubSubHostPort()) + .usePlaintext(true) + .build(); + channelProvider = FixedTransportChannelProvider + .create(GrpcTransportChannel.create(channel)); + credentialsProvider = NoCredentialsProvider.create(); + } + return new PubsubHelper(channelProvider, credentialsProvider); + } + + public static String getPubSubHostPort() { + return getDockerIpAddress() + ":" + getDockerPubSubPort(); + } + + @AfterClass + public static void cleanupPubsubChannel() throws InterruptedException { + if (channel != null) { + channel.shutdownNow().awaitTermination(1, SECONDS); + channel = null; + } + } +} diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/PubsubHelper.java b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/PubsubHelper.java new file mode 100644 index 0000000..f08576b --- /dev/null +++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/PubsubHelper.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub.emulator; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.rpc.NotFoundException; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminSettings; +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * A helper class to make managing the testing topics a bit easier. + */ +public class PubsubHelper { + + private static final Logger LOG = LoggerFactory.getLogger(PubsubHelper.class); + + private TransportChannelProvider channelProvider = null; + private CredentialsProvider credentialsProvider = null; + + private TopicAdminClient topicClient; + private SubscriptionAdminClient subscriptionAdminClient; + + public PubsubHelper() { + this(TopicAdminSettings.defaultTransportChannelProvider(), + TopicAdminSettings.defaultCredentialsProviderBuilder().build()); + } + + public PubsubHelper(TransportChannelProvider channelProvider, CredentialsProvider credentialsProvider) { + this.channelProvider = channelProvider; + this.credentialsProvider = credentialsProvider; + } + + public TransportChannelProvider getChannelProvider() { + return channelProvider; + } + + public CredentialsProvider getCredentialsProvider() { + return credentialsProvider; + } + + public TopicAdminClient getTopicAdminClient() throws IOException { + if (topicClient == null) { + TopicAdminSettings topicAdminSettings = TopicAdminSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build(); + topicClient = TopicAdminClient.create(topicAdminSettings); + } + return topicClient; + } + + public Topic createTopic(String project, String topic) throws IOException { + deleteTopic(project, topic); + ProjectTopicName topicName = ProjectTopicName.of(project, topic); + TopicAdminClient adminClient = getTopicAdminClient(); + LOG.info("CreateTopic {}", topicName); + return adminClient.createTopic(topicName); + } + + public void deleteTopic(String project, String topic) throws IOException { + deleteTopic(ProjectTopicName.of(project, topic)); + } + + public void deleteTopic(ProjectTopicName topicName) throws IOException { +// LOG.info("CreateTopic {}", topicName); + TopicAdminClient adminClient = getTopicAdminClient(); + try { + Topic existingTopic = adminClient.getTopic(topicName); + + // If it exists we delete all subscriptions and the topic itself. + LOG.info("DeleteTopic {} first delete old subscriptions.", topicName); + adminClient + .listTopicSubscriptions(topicName) + .iterateAllAsProjectSubscriptionName() + .forEach(subscriptionAdminClient::deleteSubscription); + LOG.info("DeleteTopic {}", topicName); + adminClient + .deleteTopic(topicName); + } catch (NotFoundException e) { + // Doesn't exist. Good. + } + } + + public SubscriptionAdminClient getSubscriptionAdminClient() throws IOException { + if (subscriptionAdminClient == null) { + SubscriptionAdminSettings subscriptionAdminSettings = + SubscriptionAdminSettings + .newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build(); + subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings); + } + return subscriptionAdminClient; + } + + public void createSubscription(String subscriptionProject, String subscription, String topicProject, String topic) throws IOException { + ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.newBuilder() + .setProject(subscriptionProject) + .setSubscription(subscription) + .build(); + + deleteSubscription(subscriptionName); + + SubscriptionAdminClient adminClient = getSubscriptionAdminClient(); + + ProjectTopicName topicName = ProjectTopicName.of(topicProject, topic); + + PushConfig pushConfig = PushConfig.getDefaultInstance(); + + LOG.info("CreateSubscription {}", subscriptionName); + getSubscriptionAdminClient().createSubscription(subscriptionName, topicName, pushConfig, 1); + } + + public void deleteSubscription(String subscriptionProject, String subscription) throws IOException { + deleteSubscription(ProjectSubscriptionName + .newBuilder() + .setProject(subscriptionProject) + .setSubscription(subscription) + .build()); + } + + public void deleteSubscription(ProjectSubscriptionName subscriptionName) throws IOException { + SubscriptionAdminClient adminClient = getSubscriptionAdminClient(); + try { + adminClient.getSubscription(subscriptionName); + // If it already exists we must first delete it. + LOG.info("DeleteSubscription {}", subscriptionName); + adminClient.deleteSubscription(subscriptionName); + } catch (NotFoundException e) { + // Doesn't exist. Good. + } + } + + // Mostly copied from the example on https://cloud.google.com/pubsub/docs/pull + public List<ReceivedMessage> pullMessages(String projectId, String subscriptionId, int maxNumberOfMessages) throws Exception { + SubscriberStubSettings subscriberStubSettings = + SubscriberStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build(); + try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) { + // String projectId = "my-project-id"; + // String subscriptionId = "my-subscription-id"; + // int numOfMessages = 10; // max number of messages to be pulled + String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId); + PullRequest pullRequest = + PullRequest.newBuilder() + .setMaxMessages(maxNumberOfMessages) + .setReturnImmediately(false) // return immediately if messages are not available + .setSubscription(subscriptionName) + .build(); + + // use pullCallable().futureCall to asynchronously perform this operation + PullResponse pullResponse = subscriber.pullCallable().call(pullRequest); + List<String> ackIds = new ArrayList<>(); + for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) { + // handle received message + // ... + ackIds.add(message.getAckId()); + } + // acknowledge received messages + AcknowledgeRequest acknowledgeRequest = + AcknowledgeRequest.newBuilder() + .setSubscription(subscriptionName) + .addAllAckIds(ackIds) + .build(); + // use acknowledgeCallable().futureCall to asynchronously perform this operation + subscriber.acknowledgeCallable().call(acknowledgeRequest); + return pullResponse.getReceivedMessagesList(); + } + } + + public Subscriber subscribeToSubscription(String project, String subscription, MessageReceiver messageReceiver) { + ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(project, subscription); + Subscriber subscriber = + Subscriber + .newBuilder(subscriptionName, messageReceiver) + .setChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build(); + subscriber.startAsync(); + return subscriber; + } + + public Publisher createPublisher(String project, String topic) throws IOException { + return Publisher + .newBuilder(ProjectTopicName.of(project, topic)) + .setChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build(); + } + +} diff --git a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..b316a9a --- /dev/null +++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties @@ -0,0 +1,24 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +log4j.rootLogger=INFO, testlogger +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target=System.out +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 0950e2f..b17dc70 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -61,6 +61,7 @@ under the License. <module>flink-metrics-availability-test</module> <module>flink-metrics-reporter-prometheus-test</module> <module>flink-heavy-deployment-stress-test</module> + <module>flink-connector-pubsub-emulator-tests</module> <module>flink-streaming-kafka-test-base</module> <module>flink-streaming-kafka-test</module> <module>flink-streaming-kafka011-test</module> diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index af21512..957b3c6 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -136,6 +136,8 @@ run_test "Elasticsearch (v6.3.1) sink end-to-end test" "$END_TO_END_DIR/test-scr run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java" run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala" +run_test "Test PubSub connector with Docker based Google PubSub Emulator" "$END_TO_END_DIR/test-scripts/test_streaming_pubsub.sh" + run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh" run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file" diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_pubsub.sh b/flink-end-to-end-tests/test-scripts/test_streaming_pubsub.sh new file mode 100755 index 0000000..8e08385 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_streaming_pubsub.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +cd "${END_TO_END_DIR}/flink-connector-pubsub-emulator-tests" + +mvn test -DskipTests=false diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java index 612da41..c5791bd 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.examples.pubsub; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.pubsub.PubSubSink; -import org.apache.flink.streaming.connectors.pubsub.PubSubSourceBuilder; +import org.apache.flink.streaming.connectors.pubsub.PubSubSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +45,7 @@ public class PubSubExample { System.out.println("Missing parameters!\n" + "Usage: flink run PubSub.jar --input-subscription <subscription> --input-topicName <topic> --output-topicName " + "--google-project <google project name> "); - //return; + return; } String projectName = parameterTool.getRequired("google-project"); @@ -62,10 +62,9 @@ public class PubSubExample { private static void runFlinkJob(String projectName, String subscriptionName, String outputTopicName) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.addSource(PubSubSourceBuilder.<Integer>builder() + env.addSource(PubSubSource.<Integer>newBuilder() .withProjectSubscriptionName(projectName, subscriptionName) .withDeserializationSchema(new IntegerSerializer()) - .withMode(PubSubSourceBuilder.Mode.NONE) .build()) .map(PubSubExample::printAndReturn).disableChaining() .addSink(PubSubSink.<Integer>newBuilder()