This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a96277090358d2237aab7426ebde7fdf1eaccbe4
Author: Niels Basjes <nbas...@bol.com>
AuthorDate: Wed Aug 15 14:05:39 2018 +0200

    [FLINK-9311] [pubsub] Add unit and integration tests for PubSub connectors
---
 flink-connectors/flink-connector-pubsub/pom.xml    |  86 +++----
 .../flink/streaming/connectors/pubsub/Bound.java   |  17 ++
 .../connectors/pubsub/BoundedPubSubSource.java     |  33 ++-
 .../streaming/connectors/pubsub/PubSubSink.java    | 141 +++++------
 .../streaming/connectors/pubsub/PubSubSource.java  |  40 ++--
 .../connectors/pubsub/SubscriberWrapper.java       |  22 +-
 .../common/SerializableCredentialsProvider.java    |  11 +-
 .../streaming/connectors/pubsub/BoundTest.java     |  20 +-
 .../connectors/pubsub/BoundedPubSubSourceTest.java |  25 +-
 .../connectors/pubsub/PubSubSourceTest.java        |   5 +-
 .../src/test/resources/log4j-test.properties       |  24 ++
 .../flink-connector-pubsub-emulator-tests}/pom.xml | 112 +++++----
 .../connectors/pubsub/CheckPubSubEmulatorTest.java | 115 +++++++++
 .../connectors/pubsub/EmulatedPubSubSinkTest.java  | 109 +++++++++
 .../pubsub/EmulatedPubSubSourceTest.java           | 116 +++++++++
 .../pubsub/emulator/GCloudEmulatorManager.java     | 264 +++++++++++++++++++++
 .../pubsub/emulator/GCloudUnitTestBase.java        |  84 +++++++
 .../connectors/pubsub/emulator/PubsubHelper.java   | 232 ++++++++++++++++++
 .../src/test/resources/log4j-test.properties       |  24 ++
 flink-end-to-end-tests/pom.xml                     |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh        |   2 +
 .../test-scripts/test_streaming_pubsub.sh          |  22 ++
 .../streaming/examples/pubsub/PubSubExample.java   |   7 +-
 23 files changed, 1309 insertions(+), 203 deletions(-)

diff --git a/flink-connectors/flink-connector-pubsub/pom.xml 
b/flink-connectors/flink-connector-pubsub/pom.xml
index a6e8d72..f50cbdd 100644
--- a/flink-connectors/flink-connector-pubsub/pom.xml
+++ b/flink-connectors/flink-connector-pubsub/pom.xml
@@ -35,12 +35,20 @@ under the License.
 
        <packaging>jar</packaging>
 
-       <properties>
-               <pubsub.version>1.37.1</pubsub.version>
-       </properties>
+       <!-- This is the way we get a consistent set of versions of the Google 
tools -->
+       <dependencyManagement>
+               <dependencies>
+                       <dependency>
+                               <groupId>com.google.cloud</groupId>
+                               <artifactId>google-cloud-bom</artifactId>
+                               <version>0.53.0-alpha</version>
+                               <type>pom</type>
+                               <scope>import</scope>
+                       </dependency>
+               </dependencies>
+       </dependencyManagement>
 
        <dependencies>
-
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
@@ -51,7 +59,29 @@ under the License.
                <dependency>
                        <groupId>com.google.cloud</groupId>
                        <artifactId>google-cloud-pubsub</artifactId>
-                       <version>${pubsub.version}</version>
+                       <!-- Version is pulled from google-cloud-bom -->
+                       <exclusions>
+                               <!-- Exclude an old version of guava that is 
being pulled
+                in by a transitive dependency of google-api-client -->
+                               <exclusion>
+                                       <groupId>com.google.guava</groupId>
+                                       <artifactId>guava-jdk5</artifactId>
+                               </exclusion>
+                       </exclusions>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-api</artifactId>
+                       <version>${slf4j.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-log4j12</artifactId>
+                       <version>${slf4j.version}</version>
+                       <scope>test</scope>
                </dependency>
 
                <dependency>
@@ -69,51 +99,5 @@ under the License.
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
-
        </dependencies>
-
-    <build>
-        <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-shade-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <id>shade-flink</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>shade</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<shadeTestJar>false</shadeTestJar>
-                                                       <artifactSet>
-                                                               <includes>
-                                                                       
<include>*:*</include>
-                                                               </includes>
-                                                       </artifactSet>
-                                                       <relocations>
-                                                               <relocation>
-                                                                       
<pattern>com.google.guava</pattern>
-                                                                       
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.guava</shadedPattern>
-                                                               </relocation>
-                                                               <relocation>
-                                                                       
<pattern>com.google.common.base</pattern>
-                                                                       
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.common.base</shadedPattern>
-                                                               </relocation>
-                                                               <relocation>
-                                                                       
<pattern>io.grpc.auth</pattern>
-                                                                       
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.auth</shadedPattern>
-                                                               </relocation>
-                                                               <relocation>
-                                                                       
<pattern>io.grpc.protobuf</pattern>
-                                                                       
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.protobuf</shadedPattern>
-                                                               </relocation>
-                                                       </relocations>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-        </plugins>
-    </build>
-
 </project>
diff --git 
a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
 
b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
index b37cc45..727f32e 100644
--- 
a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
+++ 
b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.streaming.connectors.pubsub;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
diff --git 
a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
 
b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
index 5f829ae..83fc15e 100644
--- 
a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
+++ 
b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.streaming.connectors.pubsub;
 
 import com.google.cloud.pubsub.v1.AckReplyConsumer;
@@ -5,7 +22,11 @@ import com.google.pubsub.v1.PubsubMessage;
 
 import java.io.IOException;
 
-class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
+/**
+ * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop 
at some point. For example after a period of idle or and after n amount of 
messages have been received.
+ *
+ */
+public class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
        private Bound<OUT> bound;
 
        private BoundedPubSubSource() {
@@ -28,11 +49,19 @@ class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
                bound.receivedMessage();
        }
 
+       /**
+        * Creates a {@link BoundedPubSubSourceBuilder}.
+        * @param <OUT> Type of Object which will be read by the produced 
{@link BoundedPubSubSource}
+        */
        @SuppressWarnings("unchecked")
        public static <OUT> BoundedPubSubSourceBuilder<OUT, ? extends 
PubSubSource, ? extends BoundedPubSubSourceBuilder> newBuilder() {
                return new BoundedPubSubSourceBuilder<>(new 
BoundedPubSubSource<OUT>());
        }
 
+       /**
+        * Builder to create BoundedPubSubSource.
+        * @param <OUT> Type of Object which will be read by the 
BoundedPubSubSource
+        */
        @SuppressWarnings("unchecked")
        public static class BoundedPubSubSourceBuilder<OUT, PSS extends 
BoundedPubSubSource<OUT>, BUILDER extends BoundedPubSubSourceBuilder<OUT, PSS, 
BUILDER>> extends PubSubSourceBuilder<OUT, PSS, BUILDER> {
                private Long boundedByAmountOfMessages;
@@ -52,7 +81,7 @@ class BoundedPubSubSource<OUT> extends PubSubSource<OUT> {
                        return (BUILDER) this;
                }
 
-               private Bound <OUT> createBound() {
+               private Bound<OUT> createBound() {
                        if (boundedByAmountOfMessages != null && 
boundedByTimeSinceLastMessage != null) {
                                return 
Bound.boundByAmountOfMessagesOrTimeSinceLastMessage(boundedByAmountOfMessages, 
boundedByTimeSinceLastMessage);
                        }
diff --git 
a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
 
b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
index 92c9c6c..e6ac53e 100644
--- 
a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
+++ 
b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.connectors.pubsub;
 
-import com.google.api.gax.core.NoCredentialsProvider;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -40,22 +39,35 @@ import java.io.IOException;
 
 /**
  * A sink function that outputs to PubSub.
+ *
  * @param <IN> type of PubSubSink messages to write
  */
 public class PubSubSink<IN> extends RichSinkFunction<IN> {
 
-       private final SerializableCredentialsProvider 
serializableCredentialsProvider;
-       private final SerializationSchema<IN>         serializationSchema;
-       private final String                          projectName;
-       private final String                          topicName;
-       private       String                          hostAndPort = null;
+       private SerializableCredentialsProvider serializableCredentialsProvider;
+       private SerializationSchema<IN> serializationSchema;
+       private String projectName;
+       private String topicName;
+       private String hostAndPort = null;
 
        private transient Publisher publisher;
 
-       public PubSubSink(SerializableCredentialsProvider 
serializableCredentialsProvider, SerializationSchema<IN> serializationSchema, 
String projectName, String topicName) {
+       private PubSubSink() {
+       }
+
+       void setSerializableCredentialsProvider(SerializableCredentialsProvider 
serializableCredentialsProvider) {
                this.serializableCredentialsProvider = 
serializableCredentialsProvider;
+       }
+
+       void setSerializationSchema(SerializationSchema<IN> 
serializationSchema) {
                this.serializationSchema = serializationSchema;
+       }
+
+       void setProjectName(String projectName) {
                this.projectName = projectName;
+       }
+
+       void setTopicName(String topicName) {
                this.topicName = topicName;
        }
 
@@ -64,15 +76,29 @@ public class PubSubSink<IN> extends RichSinkFunction<IN> {
         * The ONLY reason to use this is during tests with the emulator 
provided by Google.
         *
         * @param hostAndPort The combination of hostname and port to connect 
to ("hostname:1234")
-        * @return The current instance
         */
-       public PubSubSink<IN> withHostAndPort(String hostAndPort) {
+       void withHostAndPort(String hostAndPort) {
                this.hostAndPort = hostAndPort;
-               return this;
        }
 
-       private transient ManagedChannel   managedChannel = null;
-       private transient TransportChannel channel        = null;
+       void initialize() throws IOException {
+               if (serializableCredentialsProvider == null) {
+                       serializableCredentialsProvider = 
SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
+               }
+               if (serializationSchema == null) {
+                       throw new IllegalArgumentException("The 
serializationSchema has not been specified.");
+               }
+               if (projectName == null) {
+                       throw new IllegalArgumentException("The projectName has 
not been specified.");
+               }
+               if (topicName == null) {
+                       throw new IllegalArgumentException("The topicName has 
not been specified.");
+               }
+       }
+
+
+       private transient ManagedChannel managedChannel = null;
+       private transient TransportChannel channel = null;
 
        @Override
        public void open(Configuration configuration) throws Exception {
@@ -114,127 +140,110 @@ public class PubSubSink<IN> extends 
RichSinkFunction<IN> {
 
        /**
         * Create a builder for a new PubSubSink.
+        *
         * @param <IN> The generic of the type that is to be written into the 
sink.
         * @return a new PubSubSinkBuilder instance
         */
-       public static <IN> PubSubSinkBuilder<IN> newBuilder() {
-               return new PubSubSinkBuilder<>();
+       public static <IN> PubSubSinkBuilder<IN, ? extends PubSubSink<IN>, ? 
extends PubSubSinkBuilder<IN, ?, ?>> newBuilder() {
+               return new PubSubSinkBuilder<>(new PubSubSink<>());
        }
 
        /**
         * PubSubSinkBuilder to create a PubSubSink.
+        *
         * @param <IN> Type of PubSubSink to create.
         */
-       public static class PubSubSinkBuilder<IN> {
-               private SerializableCredentialsProvider 
serializableCredentialsProvider = null;
-               private SerializationSchema<IN>         serializationSchema     
        = null;
-               private String                          projectName             
        = null;
-               private String                          topicName               
        = null;
-               private String                          hostAndPort             
        = null;
-
-               private PubSubSinkBuilder() {
+       @SuppressWarnings("unchecked")
+       public static class PubSubSinkBuilder<IN, PSS extends PubSubSink<IN>, 
BUILDER extends PubSubSinkBuilder<IN, PSS, BUILDER>> {
+               protected PSS sinkUnderConstruction;
+
+               private PubSubSinkBuilder(PSS sinkUnderConstruction) {
+                       this.sinkUnderConstruction = sinkUnderConstruction;
                }
 
                /**
                 * Set the credentials.
                 * If this is not used then the credentials are picked up from 
the environment variables.
+                *
                 * @param credentials the Credentials needed to connect.
                 * @return The current PubSubSinkBuilder instance
                 */
-               public PubSubSinkBuilder<IN> withCredentials(Credentials 
credentials) {
-                       this.serializableCredentialsProvider = new 
SerializableCredentialsProvider(credentials);
-                       return this;
+               public BUILDER withCredentials(Credentials credentials) {
+                       
sinkUnderConstruction.setSerializableCredentialsProvider(new 
SerializableCredentialsProvider(credentials));
+                       return (BUILDER) this;
                }
 
                /**
                 * Set the CredentialsProvider.
                 * If this is not used then the credentials are picked up from 
the environment variables.
+                *
                 * @param credentialsProvider the custom 
SerializableCredentialsProvider instance.
                 * @return The current PubSubSinkBuilder instance
                 */
-               public PubSubSinkBuilder<IN> 
withCredentialsProvider(CredentialsProvider credentialsProvider) throws 
IOException {
+               public BUILDER withCredentialsProvider(CredentialsProvider 
credentialsProvider) throws IOException {
                        return 
withCredentials(credentialsProvider.getCredentials());
                }
 
                /**
                 * Set the credentials to be absent.
                 * This means that no credentials are to be used at all.
+                *
                 * @return The current PubSubSinkBuilder instance
                 */
-               public PubSubSinkBuilder<IN> withoutCredentials() {
-                       this.serializableCredentialsProvider = 
SerializableCredentialsProvider.withoutCredentials();
-                       return this;
+               public BUILDER withoutCredentials() {
+                       
sinkUnderConstruction.setSerializableCredentialsProvider(SerializableCredentialsProvider.withoutCredentials());
+                       return (BUILDER) this;
                }
 
                /**
                 * @param serializationSchema Instance of a SerializationSchema 
that converts the IN into a byte[]
                 * @return The current PubSubSinkBuilder instance
                 */
-               public PubSubSinkBuilder<IN> 
withSerializationSchema(SerializationSchema<IN> serializationSchema) {
-                       this.serializationSchema = serializationSchema;
-                       return this;
+               public BUILDER withSerializationSchema(SerializationSchema<IN> 
serializationSchema) {
+                       
sinkUnderConstruction.setSerializationSchema(serializationSchema);
+                       return (BUILDER) this;
                }
 
                /**
                 * @param projectName The name of the project in PubSub
                 * @return The current PubSubSinkBuilder instance
                 */
-               public PubSubSinkBuilder<IN> withProjectName (String 
projectName) {
-                       this.projectName = projectName;
-                       return this;
+               public BUILDER withProjectName(String projectName) {
+                       sinkUnderConstruction.setProjectName(projectName);
+                       return (BUILDER) this;
                }
 
                /**
                 * @param topicName The name of the topic in PubSub
                 * @return The current PubSubSinkBuilder instance
                 */
-               public PubSubSinkBuilder<IN> withTopicName (String topicName) {
-                       this.topicName = topicName;
-                       return this;
+               public BUILDER withTopicName(String topicName) {
+                       sinkUnderConstruction.setTopicName(topicName);
+                       return (BUILDER) this;
                }
 
-
                /**
                 * Set the custom hostname/port combination of PubSub.
                 * The ONLY reason to use this is during tests with the 
emulator provided by Google.
+                *
                 * @param hostAndPort The combination of hostname and port to 
connect to ("hostname:1234")
                 * @return The current PubSubSinkBuilder instance
                 */
-               public PubSubSinkBuilder<IN> withHostAndPort(String 
hostAndPort) {
-                       this.hostAndPort = hostAndPort;
-                       return this;
+               public BUILDER withHostAndPort(String hostAndPort) {
+                       sinkUnderConstruction.withHostAndPort(hostAndPort);
+                       return (BUILDER) this;
                }
 
                /**
                 * Actually builder the desired instance of the PubSubSink.
+                *
                 * @return a brand new PubSubSink
-                * @throws IOException incase of a problem getting the 
credentials
+                * @throws IOException              incase of a problem getting 
the credentials
                 * @throws IllegalArgumentException incase required fields were 
not specified.
                 */
                public PubSubSink<IN> build() throws IOException {
-                       if (serializableCredentialsProvider == null) {
-                               serializableCredentialsProvider = 
SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
-                       }
-                       if (serializationSchema == null) {
-                               throw new IllegalArgumentException("The 
serializationSchema has not been specified.");
-                       }
-                       if (projectName == null) {
-                               throw new IllegalArgumentException("The 
projectName has not been specified.");
-                       }
-                       if (topicName == null) {
-                               throw new IllegalArgumentException("The 
topicName has not been specified.");
-                       }
-
-                       PubSubSink<IN> pubSubSink = new PubSubSink<>(
-                               serializableCredentialsProvider,
-                               serializationSchema,
-                               projectName, topicName);
-
-                       if (hostAndPort != null) {
-                               pubSubSink.withHostAndPort(hostAndPort);
-                       }
-
-                       return pubSubSink;
+                       sinkUnderConstruction.initialize();
+                       return sinkUnderConstruction;
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
 
b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
index 8f8c689..2d93998 100644
--- 
a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
+++ 
b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
@@ -43,7 +43,7 @@ import java.util.List;
  */
 public class PubSubSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OUT, String, AckReplyConsumer> 
implements MessageReceiver, ResultTypeQueryable<OUT>, 
ParallelSourceFunction<OUT> {
        private DeserializationSchema<OUT> deserializationSchema;
-       private SubscriberWrapper          subscriberWrapper;
+       private SubscriberWrapper subscriberWrapper;
 
        protected transient SourceContext<OUT> sourceContext = null;
 
@@ -64,7 +64,8 @@ public class PubSubSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase
                super.open(configuration);
                subscriberWrapper.initialize(this);
                if (hasNoCheckpointingEnabled(getRuntimeContext())) {
-                       throw new IllegalArgumentException("Checkpointing needs 
to be enabled to support: PubSub ATLEAST_ONCE");
+                       throw new IllegalArgumentException("The PubSubSource 
REQUIRES Checkpointing to be enabled and " +
+                               "the checkpointing frequency must be MUCH lower 
than the PubSub timeout for it to retry a message.");
                }
        }
 
@@ -123,26 +124,27 @@ public class PubSubSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase
                return deserializationSchema.getProducedType();
        }
 
-       @SuppressWarnings("unchecked")
-       public static <OUT> PubSubSourceBuilder<OUT, ? extends PubSubSource, ? 
extends PubSubSourceBuilder> newBuilder() {
-               return new PubSubSourceBuilder<>(new PubSubSource<OUT>());
+       public static <OUT> PubSubSourceBuilder<OUT, ? extends 
PubSubSource<OUT>, ? extends PubSubSourceBuilder<OUT, ?, ?>> newBuilder() {
+               return new PubSubSourceBuilder<>(new PubSubSource<>());
        }
 
        /**
         * Builder to create PubSubSource.
-        * @param <OUT> The type of objects which will be read
-        * @param <PSS> The type of PubSubSource
+        *
+        * @param <OUT>     The type of objects which will be read
+        * @param <PSS>     The type of PubSubSource
         * @param <BUILDER> The type of Builder to create the PubSubSource
         */
+       @SuppressWarnings("unchecked")
        public static class PubSubSourceBuilder<OUT, PSS extends 
PubSubSource<OUT>, BUILDER extends PubSubSourceBuilder<OUT, PSS, BUILDER>> {
-               protected PSS                                                   
sourceUnderConstruction;
+               protected PSS sourceUnderConstruction;
 
-               private SubscriberWrapper               subscriberWrapper = 
null;
+               private SubscriberWrapper subscriberWrapper = null;
                private SerializableCredentialsProvider 
serializableCredentialsProvider;
-               private DeserializationSchema<OUT>      deserializationSchema;
-               private String                          projectName;
-               private String                          subscriptionName;
-               private String                          hostAndPort;
+               private DeserializationSchema<OUT> deserializationSchema;
+               private String projectName;
+               private String subscriptionName;
+               private String hostAndPort;
 
                protected PubSubSourceBuilder(PSS sourceUnderConstruction) {
                        this.sourceUnderConstruction = sourceUnderConstruction;
@@ -151,6 +153,7 @@ public class PubSubSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase
                /**
                 * Set the credentials.
                 * If this is not used then the credentials are picked up from 
the environment variables.
+                *
                 * @param credentials the Credentials needed to connect.
                 * @return The current PubSubSourceBuilder instance
                 */
@@ -162,6 +165,7 @@ public class PubSubSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase
                /**
                 * Set the CredentialsProvider.
                 * If this is not used then the credentials are picked up from 
the environment variables.
+                *
                 * @param credentialsProvider the custom 
SerializableCredentialsProvider instance.
                 * @return The current PubSubSourceBuilder instance
                 */
@@ -172,6 +176,7 @@ public class PubSubSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase
                /**
                 * Set the credentials to be absent.
                 * This means that no credentials are to be used at all.
+                *
                 * @return The current PubSubSourceBuilder instance
                 */
                public BUILDER withoutCredentials() {
@@ -183,13 +188,13 @@ public class PubSubSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase
                 * @param deserializationSchema Instance of a 
DeserializationSchema that converts the OUT into a byte[]
                 * @return The current PubSubSourceBuilder instance
                 */
-               public BUILDER withDeserializationSchema(DeserializationSchema 
<OUT> deserializationSchema) {
+               public BUILDER 
withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
                        this.deserializationSchema = deserializationSchema;
                        return (BUILDER) this;
                }
 
                /**
-                * @param projectName The name of the project in 
GoogleCloudPlatform
+                * @param projectName      The name of the project in 
GoogleCloudPlatform
                 * @param subscriptionName The name of the subscription in 
PubSub
                 * @return The current PubSubSourceBuilder instance
                 */
@@ -202,6 +207,7 @@ public class PubSubSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase
                /**
                 * Set the custom hostname/port combination of PubSub.
                 * The ONLY reason to use this is during tests with the 
emulator provided by Google.
+                *
                 * @param hostAndPort The combination of hostname and port to 
connect to ("hostname:1234")
                 * @return The current PubSubSourceBuilder instance
                 */
@@ -213,6 +219,7 @@ public class PubSubSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase
                /**
                 * Set a complete SubscriberWrapper.
                 * The ONLY reason to use this is during tests.
+                *
                 * @param subscriberWrapper The fully instantiated 
SubscriberWrapper
                 * @return The current PubSubSourceBuilder instance
                 */
@@ -223,8 +230,9 @@ public class PubSubSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase
 
                /**
                 * Actually build the desired instance of the 
PubSubSourceBuilder.
+                *
                 * @return a brand new SourceFunction
-                * @throws IOException incase of a problem getting the 
credentials
+                * @throws IOException              incase of a problem getting 
the credentials
                 * @throws IllegalArgumentException incase required fields were 
not specified.
                 */
                public PSS build() throws IOException {
diff --git 
a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
 
b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
index fb75f43..0595877 100644
--- 
a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
+++ 
b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
@@ -33,13 +33,13 @@ import java.io.Serializable;
 
 class SubscriberWrapper implements Serializable {
        private final SerializableCredentialsProvider 
serializableCredentialsProvider;
-       private final String                          projectId;
-       private final String                          subscriptionId;
-       private       String                          hostAndPort = null;
+       private final String projectId;
+       private final String subscriptionId;
+       private String hostAndPort = null;
 
-       private transient Subscriber       subscriber;
-       private transient ManagedChannel   managedChannel = null;
-       private transient TransportChannel channel        = null;
+       private transient Subscriber subscriber;
+       private transient ManagedChannel managedChannel = null;
+       private transient TransportChannel channel = null;
 
        SubscriberWrapper(SerializableCredentialsProvider 
serializableCredentialsProvider, ProjectSubscriptionName 
projectSubscriptionName) {
                this.serializableCredentialsProvider = 
serializableCredentialsProvider;
@@ -49,14 +49,14 @@ class SubscriberWrapper implements Serializable {
 
        void initialize(MessageReceiver messageReceiver) {
                Subscriber.Builder builder = Subscriber
-                               
.newBuilder(ProjectSubscriptionName.of(projectId, subscriptionId), 
messageReceiver)
-                               
.setCredentialsProvider(serializableCredentialsProvider);
+                       .newBuilder(ProjectSubscriptionName.of(projectId, 
subscriptionId), messageReceiver)
+                       
.setCredentialsProvider(serializableCredentialsProvider);
 
                if (hostAndPort != null) {
                        managedChannel = ManagedChannelBuilder
-                                       .forTarget(hostAndPort)
-                                       .usePlaintext(true) // This is 'Ok' 
because this is ONLY used for testing.
-                                       .build();
+                               .forTarget(hostAndPort)
+                               .usePlaintext(true) // This is 'Ok' because 
this is ONLY used for testing.
+                               .build();
                        channel = 
GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
                        
builder.setChannelProvider(FixedTransportChannelProvider.create(channel));
                }
diff --git 
a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
 
b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
index bd04058..44b1fa0 100644
--- 
a/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
+++ 
b/flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
@@ -40,7 +40,9 @@ public class SerializableCredentialsProvider implements 
CredentialsProvider, Ser
        }
 
        /**
-        * Creates a SerializableCredentialsProvider for a PubSubSubscription 
based on environment variables. {@link 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+        * Creates a SerializableCredentialsProvider for a PubSubSubscription 
based on environment variables.
+        * {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+        *
         * @return serializableCredentialsProvider
         * @throws IOException thrown by {@link Credentials}
         */
@@ -50,11 +52,12 @@ public class SerializableCredentialsProvider implements 
CredentialsProvider, Ser
        }
 
        /**
-        * Creates a SerializableCredentialsProvider for a PubSubSubscription 
without any credentials. {@link 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
+        * Creates a SerializableCredentialsProvider for a PubSubSubscription 
without any credentials.
+        * {@link com.google.cloud.pubsub.v1.SubscriptionAdminSettings}
         * This is ONLY useful when running tests locally against Mockito or 
the Google PubSub emulator
-        * @see <a href="https://cloud.google.com/pubsub/docs/emulator"; 
target="_top">https://cloud.google.com/pubsub/docs/emulator</a>.
+        * @see <a href="https://cloud.google.com/pubsub/docs/emulator"; 
target="_top">https://cloud.google.com/pubsub/docs/emulator</a>
         * @return serializableCredentialsProvider
-        * @throws IOException thrown by {@link Credentials}
+        * @see <a href="https://cloud.google.com/pubsub/docs/emulator"; 
target="_top">https://cloud.google.com/pubsub/docs/emulator</a>
         */
        public static SerializableCredentialsProvider withoutCredentials() {
                return new 
SerializableCredentialsProvider(NoCredentials.getInstance());
diff --git 
a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
 
b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
index f98731b..a340ae9 100644
--- 
a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
+++ 
b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.streaming.connectors.pubsub;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -5,9 +22,9 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.junit.Test;
 
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 /**
  * Test for {@link Bound}.
@@ -107,6 +124,7 @@ public class BoundTest {
                try {
                        Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
+                       // Ignore any exceptions
                }
        }
 }
diff --git 
a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
 
b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
index 805f823..5f938fd 100644
--- 
a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
+++ 
b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.streaming.connectors.pubsub;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -13,9 +30,9 @@ import org.junit.Test;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 /**
  * Tests for {@link BoundedPubSubSource}.
@@ -63,8 +80,8 @@ public class BoundedPubSubSourceTest {
 
        private PubsubMessage pubSubMessage() {
                return PubsubMessage.newBuilder()
-                               .setMessageId("message-id")
-                               
.setData(ByteString.copyFrom("some-message".getBytes()))
-                               .build();
+                       .setMessageId("message-id")
+                       .setData(ByteString.copyFrom("some-message".getBytes()))
+                       .build();
        }
 }
diff --git 
a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
 
b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
index 73ca53b..9db5d7d 100644
--- 
a/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
+++ 
b/flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/PubSubSourceTest.java
@@ -42,7 +42,6 @@ import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
 
-
 /**
  * Test for {@link SourceFunction}.
  */
@@ -142,7 +141,7 @@ public class PubSubSourceTest {
 
        private PubsubMessage pubSubMessage() {
                return PubsubMessage.newBuilder()
-                                                       
.setData(ByteString.copyFrom(SERIALIZED_MESSAGE))
-                                                       .build();
+                       .setData(ByteString.copyFrom(SERIALIZED_MESSAGE))
+                       .build();
        }
 }
diff --git 
a/flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..b316a9a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+log4j.rootLogger=INFO, testlogger
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.out
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git a/flink-connectors/flink-connector-pubsub/pom.xml 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml
similarity index 50%
copy from flink-connectors/flink-connector-pubsub/pom.xml
copy to flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml
index a6e8d72..7dd0d15 100644
--- a/flink-connectors/flink-connector-pubsub/pom.xml
+++ b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml
@@ -25,33 +25,75 @@ under the License.
 
        <parent>
                <groupId>org.apache.flink</groupId>
-               <artifactId>flink-connectors</artifactId>
+               <artifactId>flink-end-to-end-tests</artifactId>
                <version>1.7-SNAPSHOT</version>
                <relativePath>..</relativePath>
        </parent>
 
-       <artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId>
-       <name>flink-connector-pubsub</name>
+       <artifactId>flink-connector-pubsub-emulator-tests</artifactId>
+       <name>flink-connector-pubsub-emulator-tests</name>
 
        <packaging>jar</packaging>
 
-       <properties>
-               <pubsub.version>1.37.1</pubsub.version>
-       </properties>
+       <!-- This is the way we get a consistent set of versions of the Google 
tools -->
+       <dependencyManagement>
+               <dependencies>
+                       <dependency>
+                               <groupId>com.google.cloud</groupId>
+                               <artifactId>google-cloud-bom</artifactId>
+                               <version>0.53.0-alpha</version>
+                               <type>pom</type>
+                               <scope>import</scope>
+                       </dependency>
+               </dependencies>
+       </dependencyManagement>
 
        <dependencies>
 
+               <!--All dependencies are   <scope>test</scope> -->
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
-                       <scope>provided</scope>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-pubsub_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
                </dependency>
 
                <dependency>
                        <groupId>com.google.cloud</groupId>
                        <artifactId>google-cloud-pubsub</artifactId>
-                       <version>${pubsub.version}</version>
+                       <!-- Version is pulled from google-cloud-bom -->
+                       <exclusions>
+                               <!-- Exclude an old version of guava that is 
being pulled
+                in by a transitive dependency of google-api-client -->
+                               <exclusion>
+                                       <groupId>com.google.guava</groupId>
+                                       <artifactId>guava-jdk5</artifactId>
+                               </exclusion>
+                       </exclusions>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-log4j12</artifactId>
+                       <version>${slf4j.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- This is used to run the local PubSub -->
+               <dependency>
+                       <groupId>com.spotify</groupId>
+                       <artifactId>docker-client</artifactId>
+                       <version>8.11.7</version>
+                       <scope>test</scope>
                </dependency>
 
                <dependency>
@@ -69,51 +111,39 @@ under the License.
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
-
        </dependencies>
 
-    <build>
-        <plugins>
+       <!-- ONLY run the tests when explicitly told to do so  -->
+       <properties>
+               <skipTests>true</skipTests>
+       </properties>
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <version>2.12.4</version>
+                               <configuration>
+                                       <skipTests>${skipTests}</skipTests>
+                               </configuration>
+                       </plugin>
+                       <!-- Disabling convergence check because there are 
multiple problems within the used pubsub dependencies -->
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-shade-plugin</artifactId>
+                               <artifactId>maven-enforcer-plugin</artifactId>
                                <executions>
                                        <execution>
-                                               <id>shade-flink</id>
-                                               <phase>package</phase>
+                                               <id>dependency-convergence</id>
                                                <goals>
-                                                       <goal>shade</goal>
+                                                       <goal>enforce</goal>
                                                </goals>
                                                <configuration>
-                                                       
<shadeTestJar>false</shadeTestJar>
-                                                       <artifactSet>
-                                                               <includes>
-                                                                       
<include>*:*</include>
-                                                               </includes>
-                                                       </artifactSet>
-                                                       <relocations>
-                                                               <relocation>
-                                                                       
<pattern>com.google.guava</pattern>
-                                                                       
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.guava</shadedPattern>
-                                                               </relocation>
-                                                               <relocation>
-                                                                       
<pattern>com.google.common.base</pattern>
-                                                                       
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.com.google.common.base</shadedPattern>
-                                                               </relocation>
-                                                               <relocation>
-                                                                       
<pattern>io.grpc.auth</pattern>
-                                                                       
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.auth</shadedPattern>
-                                                               </relocation>
-                                                               <relocation>
-                                                                       
<pattern>io.grpc.protobuf</pattern>
-                                                                       
<shadedPattern>org.apache.flink.streaming.connectors.pubsub.shaded.io.grpc.protobuf</shadedPattern>
-                                                               </relocation>
-                                                       </relocations>
+                                                       <skip>true</skip>
                                                </configuration>
                                        </execution>
                                </executions>
                        </plugin>
-        </plugins>
-    </build>
+               </plugins>
+       </build>
 
 </project>
diff --git 
a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/CheckPubSubEmulatorTest.java
 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/CheckPubSubEmulatorTest.java
new file mode 100644
index 0000000..a54d47b
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/CheckPubSubEmulatorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import 
org.apache.flink.streaming.connectors.pubsub.emulator.GCloudUnitTestBase;
+import org.apache.flink.streaming.connectors.pubsub.emulator.PubsubHelper;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests to ensure the docker image with PubSub is working correctly.
+ */
+public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(CheckPubSubEmulatorTest.class);
+
+       private static final String PROJECT_NAME = "Project";
+       private static final String TOPIC_NAME = "Topic";
+       private static final String SUBSCRIPTION_NAME = "Subscription";
+
+       private static PubsubHelper pubsubHelper = getPubsubHelper();
+
+       @BeforeClass
+       public static void setUp() throws Exception {
+               pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
+               pubsubHelper.createSubscription(PROJECT_NAME, 
SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               pubsubHelper.deleteSubscription(PROJECT_NAME, 
SUBSCRIPTION_NAME);
+               pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME);
+       }
+
+       @Test
+       public void testPull() throws Exception {
+               Publisher publisher = 
pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
+               publisher
+                       .publish(PubsubMessage
+                               .newBuilder()
+                               .setData(ByteString.copyFromUtf8("Hello World 
PULL"))
+                               .build())
+                       .get();
+
+               List<ReceivedMessage> receivedMessages = 
pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 10);
+               assertEquals(1, receivedMessages.size());
+               assertEquals("Hello World PULL", 
receivedMessages.get(0).getMessage().getData().toStringUtf8());
+
+               publisher.shutdown();
+       }
+
+       @Test
+       public void testPush() throws Exception {
+               List<PubsubMessage> receivedMessages = new ArrayList<>();
+               Subscriber subscriber = pubsubHelper.
+                       subscribeToSubscription(
+                               PROJECT_NAME,
+                               SUBSCRIPTION_NAME,
+                               (message, consumer) -> 
receivedMessages.add(message)
+                       );
+
+               Publisher publisher = 
pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
+               publisher
+                       .publish(PubsubMessage
+                               .newBuilder()
+                               .setData(ByteString.copyFromUtf8("Hello World"))
+                               .build())
+                       .get();
+
+               LOG.info("Waiting a while to receive the message...");
+               Thread.sleep(1000);
+
+               assertEquals(1, receivedMessages.size());
+               assertEquals("Hello World", 
receivedMessages.get(0).getData().toStringUtf8());
+
+               try {
+                       subscriber.stopAsync().awaitTerminated(100, 
MILLISECONDS);
+               } catch (TimeoutException tme) {
+                       // Yeah, whatever. Don't care about clean shutdown here.
+               }
+               publisher.shutdown();
+       }
+
+}
diff --git 
a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSinkTest.java
 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSinkTest.java
new file mode 100644
index 0000000..165579f
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSinkTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.pubsub.emulator.GCloudUnitTestBase;
+import org.apache.flink.streaming.connectors.pubsub.emulator.PubsubHelper;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test of the PubSub SINK with the Google PubSub emulator.
+ */
+public class EmulatedPubSubSinkTest extends GCloudUnitTestBase {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(EmulatedPubSubSinkTest.class);
+
+       private static final String PROJECT_NAME = "FLProject";
+       private static final String TOPIC_NAME = "FLTopic";
+       private static final String SUBSCRIPTION_NAME = "FLSubscription";
+
+       private static PubsubHelper pubsubHelper;
+
+       @BeforeClass
+       public static void setUp() throws Exception {
+               pubsubHelper = getPubsubHelper();
+               pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
+               pubsubHelper.createSubscription(PROJECT_NAME, 
SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               pubsubHelper.deleteSubscription(PROJECT_NAME, 
SUBSCRIPTION_NAME);
+               pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME);
+       }
+
+       @Test
+       public void testFlinkSink() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               List<String> input = Arrays.asList("One", "Two", "Three", 
"Four", "Five", "Six", "Seven", "Eigth", "Nine", "Ten");
+
+               // Create test stream
+               DataStream<String> theData = env
+                       .fromCollection(input)
+                       .name("Test input")
+                       .map((MapFunction<String, String>) 
StringUtils::reverse);
+
+               // Sink into pubsub
+               theData
+                       .addSink(PubSubSink.<String>newBuilder()
+                               .withProjectName(PROJECT_NAME)
+                               .withTopicName(TOPIC_NAME)
+                               .withSerializationSchema(new 
SimpleStringSchema())
+                               // Specific for emulator
+                               
.withCredentialsProvider(getPubsubHelper().getCredentialsProvider())
+                               .withHostAndPort(getPubSubHostPort())
+                               .build())
+                       .name("PubSub sink");
+
+               // Run
+               env.execute();
+
+               // Now get the result from PubSub and verify if everything is 
there
+               List<ReceivedMessage> receivedMessages = 
pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 100);
+
+               assertEquals("Wrong number of elements", input.size(), 
receivedMessages.size());
+
+               // Check output strings
+               List<String> output = new ArrayList<>();
+               receivedMessages.forEach(msg -> 
output.add(msg.getMessage().getData().toStringUtf8()));
+
+               for (String test : input) {
+                       assertTrue("Missing " + test, 
output.contains(StringUtils.reverse(test)));
+               }
+       }
+
+}
diff --git 
a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSourceTest.java
 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSourceTest.java
new file mode 100644
index 0000000..aaa4fc3
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/EmulatedPubSubSourceTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.pubsub.emulator.GCloudUnitTestBase;
+import org.apache.flink.streaming.connectors.pubsub.emulator.PubsubHelper;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test of the PubSub SOURCE with the Google PubSub emulator.
+ */
+public class EmulatedPubSubSourceTest extends GCloudUnitTestBase {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(EmulatedPubSubSourceTest.class);
+
+       private static final String PROJECT_NAME = "FLProject";
+       private static final String TOPIC_NAME = "FLTopic";
+       private static final String SUBSCRIPTION_NAME = "FLSubscription";
+
+       private static PubsubHelper pubsubHelper;
+
+       @BeforeClass
+       public static void setUp() throws Exception {
+               pubsubHelper = getPubsubHelper();
+               pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
+               pubsubHelper.createSubscription(PROJECT_NAME, 
SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               pubsubHelper.deleteSubscription(PROJECT_NAME, 
SUBSCRIPTION_NAME);
+               pubsubHelper.deleteTopic(PROJECT_NAME, TOPIC_NAME);
+       }
+
+       @Test
+       public void testFlinkSource() throws Exception {
+               // Create some messages and put them into pubsub
+               List<String> input = Arrays.asList("One", "Two", "Three", 
"Four", "Five", "Six", "Seven", "Eigth", "Nine", "Ten");
+
+               // Publish the messages into PubSub
+               Publisher publisher = 
pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
+               input.forEach(s -> {
+                       try {
+                               publisher
+                                       .publish(PubsubMessage
+                                               .newBuilder()
+                                               
.setData(ByteString.copyFromUtf8(s))
+                                               .build())
+                                       .get();
+                       } catch (InterruptedException | ExecutionException e) {
+                               e.printStackTrace();
+                       }
+               });
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.enableCheckpointing(100);
+
+               DataStream<String> fromPubSub = env
+                       .addSource(BoundedPubSubSource.<String>newBuilder()
+                               .withDeserializationSchema(new 
SimpleStringSchema())
+                               .withProjectSubscriptionName(PROJECT_NAME, 
SUBSCRIPTION_NAME)
+                               // Specific for emulator
+                               
.withCredentialsProvider(getPubsubHelper().getCredentialsProvider())
+                               .withHostAndPort(getPubSubHostPort())
+                               // Make sure the test topology self terminates
+                               .boundedByTimeSinceLastMessage(1000)
+                               .build())
+                       .name("PubSub source");
+
+               List<String> output = new ArrayList<>();
+               fromPubSub.writeUsingOutputFormat(new 
LocalCollectionOutputFormat<>(output));
+
+               env.execute();
+
+               assertEquals("Wrong number of elements", input.size(), 
output.size());
+               for (String test : input) {
+                       assertTrue("Missing " + test, output.contains(test));
+               }
+       }
+
+}
diff --git 
a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudEmulatorManager.java
 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudEmulatorManager.java
new file mode 100644
index 0000000..27a658a
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudEmulatorManager.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.emulator;
+
+import com.spotify.docker.client.DefaultDockerClient;
+import com.spotify.docker.client.DockerClient;
+import com.spotify.docker.client.exceptions.ContainerNotFoundException;
+import com.spotify.docker.client.exceptions.DockerCertificateException;
+import com.spotify.docker.client.exceptions.DockerException;
+import com.spotify.docker.client.exceptions.ImageNotFoundException;
+import com.spotify.docker.client.messages.ContainerConfig;
+import com.spotify.docker.client.messages.ContainerCreation;
+import com.spotify.docker.client.messages.ContainerInfo;
+import com.spotify.docker.client.messages.HostConfig;
+import com.spotify.docker.client.messages.PortBinding;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * The class that handles the starting and stopping of the emulator docker 
image.
+ */
+public class GCloudEmulatorManager {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GCloudEmulatorManager.class);
+
+       private static DockerClient docker;
+
+       private static String dockerIpAddress = "127.0.0.1";
+
+       public static final String INTERNAL_PUBSUB_PORT = "22222";
+       public static final String DOCKER_IMAGE_NAME = 
"google/cloud-sdk:latest";
+
+       private static String pubsubPort;
+
+       public static String getDockerIpAddress() {
+               if (dockerIpAddress == null) {
+                       throw new IllegalStateException("The docker has not yet 
been started (yet) so you cannot get the IP address yet.");
+               }
+               return dockerIpAddress;
+       }
+
+       public static String getDockerPubSubPort() {
+               if (pubsubPort == null) {
+                       throw new IllegalStateException("The docker has not yet 
been started (yet) so you cannot get the port information yet.");
+               }
+               return pubsubPort;
+       }
+
+       public static final String UNITTEST_PROJECT_ID = 
"running-from-junit-for-flink";
+       private static final String CONTAINER_NAME_JUNIT = (DOCKER_IMAGE_NAME + 
"_" + UNITTEST_PROJECT_ID).replaceAll("[^a-zA-Z0-9_]", "_");
+
+       public static void launchDocker() throws DockerException, 
InterruptedException, DockerCertificateException {
+               // Create a client based on DOCKER_HOST and DOCKER_CERT_PATH 
env vars
+               docker = DefaultDockerClient.fromEnv().build();
+
+               terminateAndDiscardAnyExistingContainers(true);
+
+               LOG.info("");
+               LOG.info("/===========================================");
+               LOG.info("| GCloud Emulator");
+
+               ContainerInfo containerInfo;
+               String id;
+
+               try {
+                       docker.inspectImage(DOCKER_IMAGE_NAME);
+               } catch (ImageNotFoundException e) {
+                       // No such image so we must download it first.
+                       LOG.info("| - Getting docker image \"{}\"", 
DOCKER_IMAGE_NAME);
+                       docker.pull(DOCKER_IMAGE_NAME, message -> {
+                               if (message.id() != null && message.progress() 
!= null) {
+                                       LOG.info("| - Downloading > {} : {}", 
message.id(), message.progress());
+                               }
+                       });
+               }
+
+               // No such container. Good, we create one!
+               LOG.info("| - Creating new container");
+
+               // Bind container ports to host ports
+               final Map<String, List<PortBinding>> portBindings = new 
HashMap<>();
+               portBindings.put(INTERNAL_PUBSUB_PORT, 
Collections.singletonList(PortBinding.randomPort("0.0.0.0")));
+
+               final HostConfig hostConfig = 
HostConfig.builder().portBindings(portBindings).build();
+
+               // Create new container with exposed ports
+               final ContainerConfig containerConfig = 
ContainerConfig.builder()
+                       .hostConfig(hostConfig)
+                       .exposedPorts(INTERNAL_PUBSUB_PORT)
+                       .image(DOCKER_IMAGE_NAME)
+                       .cmd("sh", "-c", "mkdir -p /opt/data/pubsub ; gcloud 
beta emulators pubsub start --data-dir=/opt/data/pubsub  --host-port=0.0.0.0:" 
+ INTERNAL_PUBSUB_PORT)
+                       .build();
+
+               final ContainerCreation creation = 
docker.createContainer(containerConfig, CONTAINER_NAME_JUNIT);
+               id = creation.id();
+
+               containerInfo = docker.inspectContainer(id);
+
+               if (!containerInfo.state().running()) {
+                       LOG.warn("| - Starting it up ....");
+                       docker.startContainer(id);
+                       Thread.sleep(1000);
+               }
+
+               containerInfo = docker.inspectContainer(id);
+
+               dockerIpAddress = "127.0.0.1";
+
+               Map<String, List<PortBinding>> ports = 
containerInfo.networkSettings().ports();
+
+               assertNotNull("Unable to retrieve the ports where to connect to 
the emulators", ports);
+               assertEquals("We expect 1 port to be mapped", 1, ports.size());
+
+               pubsubPort = getPort(ports, INTERNAL_PUBSUB_PORT, "PubSub");
+
+               LOG.info("| Waiting for the emulators to be running");
+
+               // PubSub exposes an "Ok" at the root url when running.
+               if (!waitForOkStatus("PubSub", pubsubPort)) {
+                       // Oops, we did not get an "Ok" within 10 seconds
+                       startHasFailedKillEverything();
+               }
+               LOG.info("\\===========================================");
+               LOG.info("");
+       }
+
+       private static void startHasFailedKillEverything() throws 
DockerException, InterruptedException {
+               LOG.error("|");
+               LOG.error("| ==================== ");
+               LOG.error("| YOUR TESTS WILL FAIL ");
+               LOG.error("| ==================== ");
+               LOG.error("|");
+
+               // Kill this container and wipe all connection information
+               dockerIpAddress = null;
+               pubsubPort = null;
+               terminateAndDiscardAnyExistingContainers(false);
+       }
+
+       private static final long MAX_RETRY_TIMEOUT = 10000; // Milliseconds
+
+       private static boolean waitForOkStatus(String label, String port) {
+               long start = System.currentTimeMillis();
+               while (true) {
+                       try {
+                               URL url = new URL("http://"; + dockerIpAddress + 
":" + port + "/");
+                               HttpURLConnection con = (HttpURLConnection) 
url.openConnection();
+                               con.setRequestMethod("GET");
+                               con.setConnectTimeout(50);
+                               con.setReadTimeout(50);
+
+                               BufferedReader in = new BufferedReader(new 
InputStreamReader(con.getInputStream()));
+                               String inputLine;
+                               StringBuilder content = new StringBuilder();
+                               while ((inputLine = in.readLine()) != null) {
+                                       content.append(inputLine);
+                               }
+                               in.close();
+                               con.disconnect();
+                               if (content.toString().contains("Ok")) {
+                                       LOG.info("| - {} Emulator is running at 
{}:{}", label, dockerIpAddress, port);
+                                       return true;
+                               }
+                       } catch (IOException e) {
+                               long now = System.currentTimeMillis();
+                               if (now - start > MAX_RETRY_TIMEOUT) {
+                                       LOG.error("| - PubSub Emulator at {}:{} 
FAILED to return an Ok status within {} ms ", dockerIpAddress, port, 
MAX_RETRY_TIMEOUT);
+                                       return false;
+                               }
+                               try {
+                                       Thread.sleep(100); // Sleep a very 
short time
+                               } catch (InterruptedException e1) {
+                                       // Ignore
+                               }
+                       }
+               }
+       }
+
+       private static String getPort(Map<String, List<PortBinding>> ports, 
String internalTCPPort, String label) {
+               List<PortBinding> portMappings = ports.get(internalTCPPort + 
"/tcp");
+               if (portMappings == null || portMappings.isEmpty()) {
+                       LOG.info("| {} Emulator {} --> NOTHING CONNECTED TO 
{}", label, internalTCPPort + "/tcp");
+                       return null;
+               }
+
+               return portMappings.get(0).hostPort();
+       }
+
+       private static void terminateAndDiscardAnyExistingContainers(boolean 
warnAboutExisting) throws DockerException, InterruptedException {
+               ContainerInfo containerInfo;
+               try {
+                       containerInfo = 
docker.inspectContainer(CONTAINER_NAME_JUNIT);
+                       // Already have this container running.
+
+                       assertNotNull("We should either we get containerInfo or 
we get an exception", containerInfo);
+
+                       LOG.info("");
+                       
LOG.info("/===========================================");
+                       if (warnAboutExisting) {
+                               LOG.warn("|    >>> FOUND OLD EMULATOR INSTANCE 
RUNNING <<< ");
+                               LOG.warn("| Destroying that one to keep tests 
running smoothly.");
+                       }
+                       LOG.info("| Cleanup of GCloud Emulator");
+
+                       // We REQUIRE 100% accurate side effect free unit tests
+                       // So we completely discard this one.
+
+                       String id = containerInfo.id();
+                       // Kill container
+                       if (containerInfo.state().running()) {
+                               docker.killContainer(id);
+                               LOG.info("| - Killed");
+                       }
+
+                       // Remove container
+                       docker.removeContainer(id);
+
+                       LOG.info("| - Removed");
+                       
LOG.info("\\===========================================");
+                       LOG.info("");
+
+               } catch (ContainerNotFoundException cnfe) {
+                       // No such container. Good !
+               }
+       }
+
+       public static void terminateDocker() throws DockerException, 
InterruptedException {
+               terminateAndDiscardAnyExistingContainers(false);
+
+               // Close the docker client
+               docker.close();
+       }
+
+       // 
====================================================================================
+
+}
diff --git 
a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudUnitTestBase.java
 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudUnitTestBase.java
new file mode 100644
index 0000000..b6a011a
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/GCloudUnitTestBase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.emulator;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.spotify.docker.client.exceptions.DockerException;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.Serializable;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.flink.streaming.connectors.pubsub.emulator.GCloudEmulatorManager.getDockerIpAddress;
+import static 
org.apache.flink.streaming.connectors.pubsub.emulator.GCloudEmulatorManager.getDockerPubSubPort;
+
+/**
+ * The base class from which unit tests should inherit if they need to use the 
Google cloud emulators.
+ */
+public class GCloudUnitTestBase implements Serializable {
+       @BeforeClass
+       public static void launchGCloudEmulator() throws Exception {
+               // Separated out into separate class so the entire test class 
to be serializable
+               GCloudEmulatorManager.launchDocker();
+       }
+
+       @AfterClass
+       public static void terminateGCloudEmulator() throws DockerException, 
InterruptedException {
+               GCloudEmulatorManager.terminateDocker();
+       }
+
+       // 
====================================================================================
+       // Pubsub helpers
+
+       private static ManagedChannel channel = null;
+       private static TransportChannelProvider channelProvider = null;
+       private static CredentialsProvider credentialsProvider = null;
+
+       public static PubsubHelper getPubsubHelper() {
+               if (channel == null) {
+                       //noinspection deprecation
+                       channel = ManagedChannelBuilder
+                               .forTarget(getPubSubHostPort())
+                               .usePlaintext(true)
+                               .build();
+                       channelProvider = FixedTransportChannelProvider
+                               .create(GrpcTransportChannel.create(channel));
+                       credentialsProvider = NoCredentialsProvider.create();
+               }
+               return new PubsubHelper(channelProvider, credentialsProvider);
+       }
+
+       public static String getPubSubHostPort() {
+               return getDockerIpAddress() + ":" + getDockerPubSubPort();
+       }
+
+       @AfterClass
+       public static void cleanupPubsubChannel() throws InterruptedException {
+               if (channel != null) {
+                       channel.shutdownNow().awaitTermination(1, SECONDS);
+                       channel = null;
+               }
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/PubsubHelper.java
 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/PubsubHelper.java
new file mode 100644
index 0000000..f08576b
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/pubsub/emulator/PubsubHelper.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.emulator;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.rpc.NotFoundException;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.PushConfig;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A helper class to make managing the testing topics a bit easier.
+ */
+public class PubsubHelper {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PubsubHelper.class);
+
+       private TransportChannelProvider channelProvider = null;
+       private CredentialsProvider credentialsProvider = null;
+
+       private TopicAdminClient topicClient;
+       private SubscriptionAdminClient subscriptionAdminClient;
+
+       public PubsubHelper() {
+               this(TopicAdminSettings.defaultTransportChannelProvider(),
+                       
TopicAdminSettings.defaultCredentialsProviderBuilder().build());
+       }
+
+       public PubsubHelper(TransportChannelProvider channelProvider, 
CredentialsProvider credentialsProvider) {
+               this.channelProvider = channelProvider;
+               this.credentialsProvider = credentialsProvider;
+       }
+
+       public TransportChannelProvider getChannelProvider() {
+               return channelProvider;
+       }
+
+       public CredentialsProvider getCredentialsProvider() {
+               return credentialsProvider;
+       }
+
+       public TopicAdminClient getTopicAdminClient() throws IOException {
+               if (topicClient == null) {
+                       TopicAdminSettings topicAdminSettings = 
TopicAdminSettings.newBuilder()
+                               .setTransportChannelProvider(channelProvider)
+                               .setCredentialsProvider(credentialsProvider)
+                               .build();
+                       topicClient = 
TopicAdminClient.create(topicAdminSettings);
+               }
+               return topicClient;
+       }
+
+       public Topic createTopic(String project, String topic) throws 
IOException {
+               deleteTopic(project, topic);
+               ProjectTopicName topicName = ProjectTopicName.of(project, 
topic);
+               TopicAdminClient adminClient = getTopicAdminClient();
+               LOG.info("CreateTopic {}", topicName);
+               return adminClient.createTopic(topicName);
+       }
+
+       public void deleteTopic(String project, String topic) throws 
IOException {
+               deleteTopic(ProjectTopicName.of(project, topic));
+       }
+
+       public void deleteTopic(ProjectTopicName topicName) throws IOException {
+//        LOG.info("CreateTopic {}", topicName);
+               TopicAdminClient adminClient = getTopicAdminClient();
+               try {
+                       Topic existingTopic = adminClient.getTopic(topicName);
+
+                       // If it exists we delete all subscriptions and the 
topic itself.
+                       LOG.info("DeleteTopic {} first delete old 
subscriptions.", topicName);
+                       adminClient
+                               .listTopicSubscriptions(topicName)
+                               .iterateAllAsProjectSubscriptionName()
+                               
.forEach(subscriptionAdminClient::deleteSubscription);
+                       LOG.info("DeleteTopic {}", topicName);
+                       adminClient
+                               .deleteTopic(topicName);
+               } catch (NotFoundException e) {
+                       // Doesn't exist. Good.
+               }
+       }
+
+       public SubscriptionAdminClient getSubscriptionAdminClient() throws 
IOException {
+               if (subscriptionAdminClient == null) {
+                       SubscriptionAdminSettings subscriptionAdminSettings =
+                               SubscriptionAdminSettings
+                                       .newBuilder()
+                                       
.setTransportChannelProvider(channelProvider)
+                                       
.setCredentialsProvider(credentialsProvider)
+                                       .build();
+                       subscriptionAdminClient = 
SubscriptionAdminClient.create(subscriptionAdminSettings);
+               }
+               return subscriptionAdminClient;
+       }
+
+       public void createSubscription(String subscriptionProject, String 
subscription, String topicProject, String topic) throws IOException {
+               ProjectSubscriptionName subscriptionName = 
ProjectSubscriptionName.newBuilder()
+                       .setProject(subscriptionProject)
+                       .setSubscription(subscription)
+                       .build();
+
+               deleteSubscription(subscriptionName);
+
+               SubscriptionAdminClient adminClient = 
getSubscriptionAdminClient();
+
+               ProjectTopicName topicName = ProjectTopicName.of(topicProject, 
topic);
+
+               PushConfig pushConfig = PushConfig.getDefaultInstance();
+
+               LOG.info("CreateSubscription {}", subscriptionName);
+               
getSubscriptionAdminClient().createSubscription(subscriptionName, topicName, 
pushConfig, 1);
+       }
+
+       public void deleteSubscription(String subscriptionProject, String 
subscription) throws IOException {
+               deleteSubscription(ProjectSubscriptionName
+                       .newBuilder()
+                       .setProject(subscriptionProject)
+                       .setSubscription(subscription)
+                       .build());
+       }
+
+       public void deleteSubscription(ProjectSubscriptionName 
subscriptionName) throws IOException {
+               SubscriptionAdminClient adminClient = 
getSubscriptionAdminClient();
+               try {
+                       adminClient.getSubscription(subscriptionName);
+                       // If it already exists we must first delete it.
+                       LOG.info("DeleteSubscription {}", subscriptionName);
+                       adminClient.deleteSubscription(subscriptionName);
+               } catch (NotFoundException e) {
+                       // Doesn't exist. Good.
+               }
+       }
+
+       // Mostly copied from the example on 
https://cloud.google.com/pubsub/docs/pull
+       public List<ReceivedMessage> pullMessages(String projectId, String 
subscriptionId, int maxNumberOfMessages) throws Exception {
+               SubscriberStubSettings subscriberStubSettings =
+                       SubscriberStubSettings.newBuilder()
+                               .setTransportChannelProvider(channelProvider)
+                               .setCredentialsProvider(credentialsProvider)
+                               .build();
+               try (SubscriberStub subscriber = 
GrpcSubscriberStub.create(subscriberStubSettings)) {
+                       // String projectId = "my-project-id";
+                       // String subscriptionId = "my-subscription-id";
+                       // int numOfMessages = 10;   // max number of messages 
to be pulled
+                       String subscriptionName = 
ProjectSubscriptionName.format(projectId, subscriptionId);
+                       PullRequest pullRequest =
+                               PullRequest.newBuilder()
+                                       .setMaxMessages(maxNumberOfMessages)
+                                       .setReturnImmediately(false) // return 
immediately if messages are not available
+                                       .setSubscription(subscriptionName)
+                                       .build();
+
+                       // use pullCallable().futureCall to asynchronously 
perform this operation
+                       PullResponse pullResponse = 
subscriber.pullCallable().call(pullRequest);
+                       List<String> ackIds = new ArrayList<>();
+                       for (ReceivedMessage message : 
pullResponse.getReceivedMessagesList()) {
+                               // handle received message
+                               // ...
+                               ackIds.add(message.getAckId());
+                       }
+                       // acknowledge received messages
+                       AcknowledgeRequest acknowledgeRequest =
+                               AcknowledgeRequest.newBuilder()
+                                       .setSubscription(subscriptionName)
+                                       .addAllAckIds(ackIds)
+                                       .build();
+                       // use acknowledgeCallable().futureCall to 
asynchronously perform this operation
+                       
subscriber.acknowledgeCallable().call(acknowledgeRequest);
+                       return pullResponse.getReceivedMessagesList();
+               }
+       }
+
+       public Subscriber subscribeToSubscription(String project, String 
subscription, MessageReceiver messageReceiver) {
+               ProjectSubscriptionName subscriptionName = 
ProjectSubscriptionName.of(project, subscription);
+               Subscriber subscriber =
+                       Subscriber
+                               .newBuilder(subscriptionName, messageReceiver)
+                               .setChannelProvider(channelProvider)
+                               .setCredentialsProvider(credentialsProvider)
+                               .build();
+               subscriber.startAsync();
+               return subscriber;
+       }
+
+       public Publisher createPublisher(String project, String topic) throws 
IOException {
+               return Publisher
+                       .newBuilder(ProjectTopicName.of(project, topic))
+                       .setChannelProvider(channelProvider)
+                       .setCredentialsProvider(credentialsProvider)
+                       .build();
+       }
+
+}
diff --git 
a/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties
 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..b316a9a
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+log4j.rootLogger=INFO, testlogger
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.out
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 0950e2f..b17dc70 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -61,6 +61,7 @@ under the License.
                <module>flink-metrics-availability-test</module>
                <module>flink-metrics-reporter-prometheus-test</module>
                <module>flink-heavy-deployment-stress-test</module>
+               <module>flink-connector-pubsub-emulator-tests</module>
                <module>flink-streaming-kafka-test-base</module>
                <module>flink-streaming-kafka-test</module>
                <module>flink-streaming-kafka011-test</module>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index af21512..957b3c6 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -136,6 +136,8 @@ run_test "Elasticsearch (v6.3.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scr
 run_test "Quickstarts Java nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
 run_test "Quickstarts Scala nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
 
+run_test "Test PubSub connector with Docker based Google PubSub Emulator" 
"$END_TO_END_DIR/test-scripts/test_streaming_pubsub.sh"
+
 run_test "Avro Confluent Schema Registry nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
 
 run_test "State TTL Heap backend end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file"
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_pubsub.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_pubsub.sh
new file mode 100755
index 0000000..8e08385
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_pubsub.sh
@@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+cd "${END_TO_END_DIR}/flink-connector-pubsub-emulator-tests"
+
+mvn test -DskipTests=false
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
index 612da41..c5791bd 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/pubsub/PubSubExample.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.examples.pubsub;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.pubsub.PubSubSink;
-import org.apache.flink.streaming.connectors.pubsub.PubSubSourceBuilder;
+import org.apache.flink.streaming.connectors.pubsub.PubSubSource;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@ public class PubSubExample {
                        System.out.println("Missing parameters!\n" +
                                                                "Usage: flink 
run PubSub.jar --input-subscription <subscription> --input-topicName <topic> 
--output-topicName " +
                                                                
"--google-project <google project name> ");
-                       //return;
+                       return;
                }
 
                String projectName = 
parameterTool.getRequired("google-project");
@@ -62,10 +62,9 @@ public class PubSubExample {
        private static void runFlinkJob(String projectName, String 
subscriptionName, String outputTopicName) throws Exception {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-               env.addSource(PubSubSourceBuilder.<Integer>builder()
+               env.addSource(PubSubSource.<Integer>newBuilder()
                                                                                
.withProjectSubscriptionName(projectName, subscriptionName)
                                                                                
.withDeserializationSchema(new IntegerSerializer())
-                                                                               
.withMode(PubSubSourceBuilder.Mode.NONE)
                                                                                
.build())
                        .map(PubSubExample::printAndReturn).disableChaining()
                        .addSink(PubSubSink.<Integer>newBuilder()

Reply via email to