[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232700234
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * This class defines a bound based on messages received or time since last 
received message.
+ * Using start(SourceFunction) starts the bound. Everytime a message is 
received the sourceFunction should call receivedMessage().
+ * When the bound is reached, the sourcefunction gets closed by calling 
sourceFunction.close()
+ * See {@link BoundedPubSubSource}.
+ *
+ * @param  type of message that is received by the SourceFunction.
+ */
+class Bound implements Serializable {
+   private static final Logger LOG = LoggerFactory.getLogger(Bound.class);
+
+   private final Bound.Mode mode;
+   private final long maxMessagedReceived;
+   private final long maxTimeBetweenMessages;
+
+   private SourceFunction sourceFunction;
+   private transient Timer timer;
+   private long messagesReceived;
+   private long lastReceivedMessage;
+   private boolean cancelled = false;
+
+   private Bound(Bound.Mode mode, long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   this.mode = mode;
+   this.maxMessagedReceived = maxMessagedReceived;
+   this.maxTimeBetweenMessages = maxTimeBetweenMessages;
+   this.messagesReceived = 0L;
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232699534
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+
+/**
+ * Tests for {@link BoundedPubSubSource}.
+ */
+public class BoundedPubSubSourceTest {
+   private final Bound bound = mock(Bound.class);
+   private final SubscriberWrapper subscriberWrapper = 
mock(SubscriberWrapper.class);
+   private final SourceFunction.SourceContext sourceContext = 
mock(SourceFunction.SourceContext.class);
+   private final AckReplyConsumer ackReplyConsumer = 
mock(AckReplyConsumer.class);
+   private final DeserializationSchema deserializationSchema = 
mock(DeserializationSchema.class);
+   private final MetricGroup metricGroup = mock(MetricGroup.class);
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232697684
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+
+/**
+ * Test for {@link Bound}.
+ */
+public class BoundTest {
+   private SourceFunction sourceFunction = 
mock(SourceFunction.class);
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232690036
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.common;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.auth.Credentials;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * Wrapper class for CredentialsProvider to make it Serializable. This can be 
used to pass on Credentials to SourceFunctions
+ */
+public class SerializableCredentialsProvider implements CredentialsProvider, 
Serializable {
+   private final Credentials credentials;
 
 Review comment:
   All `Credentials` implementations are in fact serializable:
   
   public abstract class Credentials implements Serializable {
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232687817
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * This class defines a bound based on messages received or time since last 
received message.
+ * Using start(SourceFunction) starts the bound. Everytime a message is 
received the sourceFunction should call receivedMessage().
+ * When the bound is reached, the sourcefunction gets closed by calling 
sourceFunction.close()
+ * See {@link BoundedPubSubSource}.
+ *
+ * @param  type of message that is received by the SourceFunction.
+ */
+class Bound implements Serializable {
+   private static final Logger LOG = LoggerFactory.getLogger(Bound.class);
+
+   private final Bound.Mode mode;
+   private final long maxMessagedReceived;
+   private final long maxTimeBetweenMessages;
+
+   private SourceFunction sourceFunction;
+   private transient Timer timer;
+   private long messagesReceived;
+   private long lastReceivedMessage;
+   private boolean cancelled = false;
+
+   private Bound(Bound.Mode mode, long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   this.mode = mode;
+   this.maxMessagedReceived = maxMessagedReceived;
+   this.maxTimeBetweenMessages = maxTimeBetweenMessages;
+   this.messagesReceived = 0L;
+   }
+
+   static  Bound boundByAmountOfMessages(long 
maxMessagedReceived) {
+   return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L);
+   }
+
+   static  Bound boundByTimeSinceLastMessage(long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages);
+   }
+
+   static  Bound 
boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, 
maxTimeBetweenMessages);
+   }
+
+   private TimerTask shutdownPubSubSource() {
+   return new TimerTask() {
+   @Override
+   public void run() {
+   if (maxTimeBetweenMessagesElapsed()) {
+   
cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source");
+   timer.cancel();
+   }
+   }
+   };
+   }
+
+   private synchronized boolean maxTimeBetweenMessagesElapsed() {
+   return System.currentTimeMillis() - lastReceivedMessage > 
maxTimeBetweenMessages;
+   }
+
+   private synchronized void cancelPubSubSource(String logMessage) {
+   if (!cancelled) {
+   cancelled = true;
+   sourceFunction.cancel();
+   LOG.info(logMessage);
+   }
+   }
+
+   void start(SourceFunction sourceFunction) {
+   if (this.sourceFunction != null) {
+   throw new IllegalStateException("start() already 
called");
+   }
+
+   this.sourceFunction = sourceFunction;
+   messagesReceived = 0;
+
+   if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) {
+   lastReceivedMessage = System.currentTimeMillis();
+   timer = new Timer();
+   timer.schedule(shutdownPubSubSource(), 0, 100);
+   }
+   }
+
+   synchronized void receivedMessage() {
+   if (sourceFunction == null) {
+   throw new 

[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232626013
 
 

 ##
 File path: flink-examples/flink-examples-streaming/pom.xml
 ##
 @@ -62,6 +66,17 @@ under the License.
${project.version}

 
+   
+   org.apache.flink
+   
flink-connector-pubsub_${scala.binary.version}
 
 Review comment:
   I double checked but all the other connectors are also in
   
   org.apache.flink
   
   (So I'm not going to change this one)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232681463
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
 ##
 @@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A sink function that outputs to PubSub.
+ *
+ * @param  type of PubSubSink messages to write
+ */
+public class PubSubSink extends RichSinkFunction {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSink.class);
+
+   private SerializableCredentialsProvider serializableCredentialsProvider;
+   private SerializationSchema serializationSchema;
+   private String projectName;
+   private String topicName;
+   private String hostAndPort = null;
+
+   private transient Publisher publisher;
+
+   private PubSubSink() {
+   }
+
+   void setSerializableCredentialsProvider(SerializableCredentialsProvider 
serializableCredentialsProvider) {
+   this.serializableCredentialsProvider = 
serializableCredentialsProvider;
+   }
+
+   void setSerializationSchema(SerializationSchema 
serializationSchema) {
+   this.serializationSchema = serializationSchema;
+   }
+
+   void setProjectName(String projectName) {
+   this.projectName = projectName;
+   }
+
+   void setTopicName(String topicName) {
+   this.topicName = topicName;
+   }
+
+   /**
+* Set the custom hostname/port combination of PubSub.
+* The ONLY reason to use this is during tests with the emulator 
provided by Google.
+*
+* @param hostAndPort The combination of hostname and port to connect 
to ("hostname:1234")
+*/
+   void withHostAndPort(String hostAndPort) {
+   this.hostAndPort = hostAndPort;
+   }
+
+   void initialize() throws IOException {
+   if (serializableCredentialsProvider == null) {
+   serializableCredentialsProvider = 
SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables();
+   }
+   if (serializationSchema == null) {
+   throw new IllegalArgumentException("The 
serializationSchema has not been specified.");
+   }
+   if (projectName == null) {
+   throw new IllegalArgumentException("The projectName has 
not been specified.");
+   }
+   if (topicName == null) {
+   throw new IllegalArgumentException("The topicName has 
not been specified.");
+   }
+   }
+
+
+   private transient ManagedChannel managedChannel = null;
+   private transient TransportChannel channel = null;
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   Publisher.Builder builder = Publisher
+   .newBuilder(ProjectTopicName.of(projectName, topicName))
+

[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232637001
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.core.ApiService;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Wrapper class around a PubSub {@link Subscriber}.
+ * This class makes it easier to connect to a Non Google PubSub service such 
as a local PubSub emulator or docker container.
+ */
+class SubscriberWrapper implements Serializable, MessageReceiver {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSource.class);
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636943
 
 

 ##
 File path: docs/dev/connectors/pubsub.md
 ##
 @@ -0,0 +1,106 @@
+---
+title: "Google PubSub"
+nav-title: PubSub
+nav-parent_id: connectors
+nav-pos: 7
+---
+
+
+This connector provides a Source and Sink that can read from and write to
+[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add 
the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-pubsub{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/dev/linking.html)
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636872
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/PubSubSubscriberFactory.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub.common;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+
+import java.io.Serializable;
+
+/**
+ * A factory class to create a {@link Subscriber}.
+ * This allows for customized Subscribers with for instance tweaked 
configurations
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636963
 
 

 ##
 File path: flink-connectors/flink-connector-pubsub/pom.xml
 ##
 @@ -0,0 +1,103 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-connector-pubsub_${scala.binary.version}
+   flink-connector-pubsub
+
+   jar
+
+   
+   
+   
+   
+   com.google.cloud
+   google-cloud-bom
+   0.53.0-alpha
 
 Review comment:
   That was 'current' when we wrote it. I just now updated it to 0.70 ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636831
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java
 ##
 @@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * PubSub Source, this Source will consume PubSub messages from a subscription 
and Acknowledge them as soon as they have been received.
 
 Review comment:
   The MultipleIdsMessageAcknowledgingSourceBase will buffer the message 
handles and will only ack them AFTER the checkpoint has been completed. If 
there is a crash before the checkpoint the message has not yet been acked and 
will be retried (after timeout). If there is a crash after the checkpoint the 
message is persisted as part of the checkpoint.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636540
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/DefaultPubSubSubscriberFactory.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import 
org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory;
+
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory {
+   private String hostAndPort;
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636556
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/DefaultPubSubSubscriberFactory.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import 
org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory;
+
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory {
+   private String hostAndPort;
+
+   void withHostAndPort(String hostAndPort) {
+   this.hostAndPort = hostAndPort;
+   }
+
+   @Override
+   public Subscriber getSubscriber(CredentialsProvider 
credentialsProvider, ProjectSubscriptionName projectSubscriptionName, 
MessageReceiver messageReceiver) {
+   FlowControlSettings flowControlSettings = 
FlowControlSettings.newBuilder()
+   

.setMaxOutstandingElementCount(1L)
+   

.setMaxOutstandingRequestBytes(10L)
+   

.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
+   
.build();
+   Subscriber.Builder builder = Subscriber
+   
.newBuilder(ProjectSubscriptionName.of(projectSubscriptionName.getProject(), 
projectSubscriptionName.getSubscription()), messageReceiver)
+   .setFlowControlSettings(flowControlSettings)
+   .setCredentialsProvider(credentialsProvider);
+
+   if (hostAndPort != null) {
+   ManagedChannel managedChannel = ManagedChannelBuilder
+   .forTarget(hostAndPort)
+   .usePlaintext() // This is 'Ok' because 
this is ONLY used for testing.
 
 Review comment:
   Yes, using the host and port is something that is in practice only used in 
testing scenarios.
   That is why I put the comment on the "don't do any encryption" method.
   I'll see if I can find a more suitable spot.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636463
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/DefaultPubSubSubscriberFactory.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import 
org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory;
+
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory {
+   private String hostAndPort;
+
+   void withHostAndPort(String hostAndPort) {
+   this.hostAndPort = hostAndPort;
+   }
+
+   @Override
+   public Subscriber getSubscriber(CredentialsProvider 
credentialsProvider, ProjectSubscriptionName projectSubscriptionName, 
MessageReceiver messageReceiver) {
+   FlowControlSettings flowControlSettings = 
FlowControlSettings.newBuilder()
+   

.setMaxOutstandingElementCount(1L)
+   

.setMaxOutstandingRequestBytes(10L)
 
 Review comment:
   These are not empty lines. This is yet another indentation problem.
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636422
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.IOException;
+
+/**
+ * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop 
at some point.
+ * For example after a period of being idle or and after n amount of messages 
have been received.
+ */
+public class BoundedPubSubSource extends PubSubSource {
 
 Review comment:
   We chose to split these two to keep as much of the "testing code" out of the 
normal runtime and thus avoid polluting it. Only the host/port thing remains 
(which was your other comment about this).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636122
 
 

 ##
 File path: docs/dev/connectors/pubsub.md
 ##
 @@ -0,0 +1,106 @@
+---
+title: "Google PubSub"
+nav-title: PubSub
+nav-parent_id: connectors
+nav-pos: 7
+---
+
+
+This connector provides a Source and Sink that can read from and write to
+[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add 
the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-pubsub{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/dev/linking.html)
+for information about how to package the program with the libraries for
+cluster execution.
+
+ PubSub Source
+
+The connector provides a Source for reading data from Google PubSub to Apache 
Flink. PubSub has an Atleast-Once guarantee and as such.
+
+The class `PubSubSource(…)` has a builder to create PubSubsources. 
`PubSubSource.newBuilder()`
+
+There are several optional methods to alter how the PubSubSource is created, 
the bare minimum is to provide a google project and pubsub subscription and a 
way to deserialize the PubSubMessages.
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636183
 
 

 ##
 File path: docs/dev/connectors/pubsub.md
 ##
 @@ -0,0 +1,106 @@
+---
+title: "Google PubSub"
+nav-title: PubSub
+nav-parent_id: connectors
+nav-pos: 7
+---
+
+
+This connector provides a Source and Sink that can read from and write to
+[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add 
the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-pubsub{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/dev/linking.html)
+for information about how to package the program with the libraries for
+cluster execution.
+
+ PubSub Source
+
+The connector provides a Source for reading data from Google PubSub to Apache 
Flink. PubSub has an Atleast-Once guarantee and as such.
+
+The class `PubSubSource(…)` has a builder to create PubSubsources. 
`PubSubSource.newBuilder()`
+
+There are several optional methods to alter how the PubSubSource is created, 
the bare minimum is to provide a google project and pubsub subscription and a 
way to deserialize the PubSubMessages.
+Example:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+DeserializationSchema deserializationSchema = (...);
+SourceFunction pubsubSource = PubSubSource.newBuilder()
 
 Review comment:
   Yes


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636206
 
 

 ##
 File path: flink-connectors/flink-connector-pubsub/pom.xml
 ##
 @@ -0,0 +1,103 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-connector-pubsub_${scala.binary.version}
+   flink-connector-pubsub
+
+   jar
+
+   
+   
+   
+   
+   com.google.cloud
+   google-cloud-bom
+   0.53.0-alpha
+   pom
+   import
+   
+   
+   
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   provided
+   
+
+   
+   com.google.cloud
+   google-cloud-pubsub
+   
+   
+   
+   
+   com.google.guava
+   guava-jdk5
+   
+   
+   provided
+   
+
+   
+   org.slf4j
+   slf4j-api
 
 Review comment:
   Yes, that makes it cleaner. Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636055
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapperTest.java
 ##
 @@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory;
+import 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.core.ApiService;
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.apache.flink.api.java.ClosureCleaner.ensureSerializable;
+import static 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider.withoutCredentials;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link SubscriberWrapper}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SubscriberWrapperTest {
+   @Mock
+   private PubSubSubscriberFactory pubSubSubscriberFactory;
+
+   @Mock
+   private Subscriber subscriber;
+
+   @Mock
+   private ApiService apiService;
+
+   private PubsubMessage pubsubMessage = pubSubMessage();
+
+   @Mock
+   private AckReplyConsumer ackReplyConsumer;
+
+   private SubscriberWrapper subscriberWrapper;
+
+   @Before
+   public void setup() throws Exception {
+   when(pubSubSubscriberFactory.getSubscriber(any(), any(), 
any())).thenReturn(subscriber);
+   subscriberWrapper = new SubscriberWrapper(withoutCredentials(), 
ProjectSubscriptionName.of("projectId", "subscriptionId"), 
pubSubSubscriberFactory);
+   }
+
+   @Test
+   public void testSerializedSubscriberBuilder() throws Exception {
+   SubscriberWrapper subscriberWrapper = new 
SubscriberWrapper(withoutCredentials(), ProjectSubscriptionName.of("projectId", 
"subscriptionId"), SubscriberWrapperTest::subscriberFactory);
+   ensureSerializable(subscriberWrapper);
+   }
+
+   @Test
+   public void testInitialisation() {
+   SerializableCredentialsProvider credentialsProvider = 
withoutCredentials();
+   ProjectSubscriptionName projectSubscriptionName = 
ProjectSubscriptionName.of("projectId", "subscriptionId");
+   SubscriberWrapper subscriberWrapper = new 
SubscriberWrapper(credentialsProvider, projectSubscriptionName, 
pubSubSubscriberFactory);
+
+   subscriberWrapper.initialize();
+   verify(pubSubSubscriberFactory, 
times(1)).getSubscriber(credentialsProvider, projectSubscriptionName, 
subscriberWrapper);
+   }
+
+   @Test
+   public void testStart() {
+   when(subscriber.startAsync()).thenReturn(apiService);
+   subscriberWrapper.initialize();
+
+   subscriberWrapper.start();
+   verify(apiService, times(1)).awaitRunning();
+   assertThat(subscriberWrapper.amountOfMessagesInBuffer(), is(0));
+   }
+
+   @Test
+   public void testStopWithoutInitialize() {
+   subscriberWrapper.stop();
+   verifyZeroInteractions(subscriber);
+   }
+
+   @Test
+   public void 

[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636179
 
 

 ##
 File path: docs/dev/connectors/pubsub.md
 ##
 @@ -0,0 +1,106 @@
+---
+title: "Google PubSub"
+nav-title: PubSub
+nav-parent_id: connectors
+nav-pos: 7
+---
+
+
+This connector provides a Source and Sink that can read from and write to
+[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add 
the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-pubsub{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/dev/linking.html)
+for information about how to package the program with the libraries for
+cluster execution.
+
+ PubSub Source
+
+The connector provides a Source for reading data from Google PubSub to Apache 
Flink. PubSub has an Atleast-Once guarantee and as such.
+
+The class `PubSubSource(…)` has a builder to create PubSubsources. 
`PubSubSource.newBuilder()`
+
+There are several optional methods to alter how the PubSubSource is created, 
the bare minimum is to provide a google project and pubsub subscription and a 
way to deserialize the PubSubMessages.
+Example:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+DeserializationSchema deserializationSchema = (...);
+SourceFunction pubsubSource = PubSubSource.newBuilder()
+  
.withDeserializationSchema(deserializationSchema)
+  
.withProjectSubscriptionName("google-project-name", "pubsub-subscription")
+  .build();
+
+streamExecEnv.addSource(pubsubSource);
+{% endhighlight %}
+
+
+
+ PubSub Sink
+
+The connector provides a Sink for writing data to PubSub.
+
+The class `PubSubSource(…)` has a builder to create PubSubsources. 
`PubSubSource.newBuilder()`
+
+This builder works in a similar way to the PubSubSource.
+Example:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+SerializationSchema serializationSchema = (...);
+SinkFunction pubsubSink = PubSubSink.newBuilder()
+  
.withSerializationSchema(serializationSchema)
+  
.withTopicName("pubsub-topic-name")
+  
.withProjectName("google-project-name")
+  .build()
+
+streamExecEnv.addSink(pubsubSink);
+{% endhighlight %}
+
+
+
+ Google Credentials
+
+Google uses 
[Credentials](https://cloud.google.com/docs/authentication/production) to 
authenticate and authorize applications so that they can use Google cloud 
resources such as PubSub. Both builders allow several ways to provide these 
credentials.
+
+By default the connectors will look for an environment variable: 
[GOOGLE_APPLICATION_CREDENTIALS](https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually)
 which should point to a file containing the credentials.
+
+It is also possible to provide a Credentials object directly. For instance if 
you read the Credentials yourself from an external system. In this case you can 
use `PubSubSource.newBuilder().withCredentials(...)`
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636196
 
 

 ##
 File path: docs/dev/connectors/pubsub.md
 ##
 @@ -0,0 +1,106 @@
+---
+title: "Google PubSub"
+nav-title: PubSub
+nav-parent_id: connectors
+nav-pos: 7
+---
+
+
+This connector provides a Source and Sink that can read from and write to
+[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add 
the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-pubsub{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/dev/linking.html)
+for information about how to package the program with the libraries for
+cluster execution.
+
+ PubSub Source
+
+The connector provides a Source for reading data from Google PubSub to Apache 
Flink. PubSub has an Atleast-Once guarantee and as such.
+
+The class `PubSubSource(…)` has a builder to create PubSubsources. 
`PubSubSource.newBuilder()`
+
+There are several optional methods to alter how the PubSubSource is created, 
the bare minimum is to provide a google project and pubsub subscription and a 
way to deserialize the PubSubMessages.
+Example:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+DeserializationSchema deserializationSchema = (...);
+SourceFunction pubsubSource = PubSubSource.newBuilder()
+  
.withDeserializationSchema(deserializationSchema)
+  
.withProjectSubscriptionName("google-project-name", "pubsub-subscription")
+  .build();
+
+streamExecEnv.addSource(pubsubSource);
+{% endhighlight %}
+
+
+
+ PubSub Sink
+
+The connector provides a Sink for writing data to PubSub.
+
+The class `PubSubSource(…)` has a builder to create PubSubsources. 
`PubSubSource.newBuilder()`
+
+This builder works in a similar way to the PubSubSource.
+Example:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+SerializationSchema serializationSchema = (...);
+SinkFunction pubsubSink = PubSubSink.newBuilder()
+  
.withSerializationSchema(serializationSchema)
+  
.withTopicName("pubsub-topic-name")
+  
.withProjectName("google-project-name")
+  .build()
+
+streamExecEnv.addSink(pubsubSink);
+{% endhighlight %}
+
+
+
+ Google Credentials
+
+Google uses 
[Credentials](https://cloud.google.com/docs/authentication/production) to 
authenticate and authorize applications so that they can use Google cloud 
resources such as PubSub. Both builders allow several ways to provide these 
credentials.
+
+By default the connectors will look for an environment variable: 
[GOOGLE_APPLICATION_CREDENTIALS](https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually)
 which should point to a file containing the credentials.
+
+It is also possible to provide a Credentials object directly. For instance if 
you read the Credentials yourself from an external system. In this case you can 
use `PubSubSource.newBuilder().withCredentials(...)`
+
+ Integration testing
+
+When using integration tests you might not want to connect to PubSub directly 
but use a docker container to read and write to. This is possible by using 
`PubSubSource.newBuilder().withHostAndPort("localhost:1234")`.
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636212
 
 

 ##
 File path: flink-connectors/flink-connector-pubsub/pom.xml
 ##
 @@ -0,0 +1,103 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-connector-pubsub_${scala.binary.version}
+   flink-connector-pubsub
+
+   jar
+
+   
+   
+   
+   
+   com.google.cloud
+   google-cloud-bom
+   0.53.0-alpha
+   pom
+   import
+   
+   
+   
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   provided
+   
+
+   
+   com.google.cloud
+   google-cloud-pubsub
+   
+   
+   
+   
+   com.google.guava
+   guava-jdk5
+   
+   
+   provided
+   
+
+   
+   org.slf4j
+   slf4j-api
+   ${slf4j.version}
+   
+
+   
+   org.slf4j
+   slf4j-log4j12
 
 Review comment:
   Yes, that makes it cleaner. Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636084
 
 

 ##
 File path: docs/dev/connectors/pubsub.md
 ##
 @@ -0,0 +1,106 @@
+---
+title: "Google PubSub"
+nav-title: PubSub
+nav-parent_id: connectors
+nav-pos: 7
+---
+
+
+This connector provides a Source and Sink that can read from and write to
+[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add 
the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-pubsub{{ site.scala_version_suffix 
}}
+  {{site.version }}
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232635986
 
 

 ##
 File path: flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml
 ##
 @@ -0,0 +1,149 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-connector-pubsub-emulator-tests
+   flink-connector-pubsub-emulator-tests
+
+   jar
+
+   
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636103
 
 

 ##
 File path: docs/dev/connectors/pubsub.md
 ##
 @@ -0,0 +1,106 @@
+---
+title: "Google PubSub"
+nav-title: PubSub
+nav-parent_id: connectors
+nav-pos: 7
+---
+
+
+This connector provides a Source and Sink that can read from and write to
+[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add 
the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-pubsub{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{site.baseurl}}/dev/linking.html)
+for information about how to package the program with the libraries for
+cluster execution.
+
+ PubSub Source
+
+The connector provides a Source for reading data from Google PubSub to Apache 
Flink. PubSub has an Atleast-Once guarantee and as such.
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232636021
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties
 ##
 @@ -0,0 +1,24 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, testlogger
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232634494
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties
 ##
 @@ -0,0 +1,24 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, testlogger
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-11-12 Thread GitBox
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r232626013
 
 

 ##
 File path: flink-examples/flink-examples-streaming/pom.xml
 ##
 @@ -62,6 +66,17 @@ under the License.
${project.version}

 
+   
+   org.apache.flink
+   
flink-connector-pubsub_${scala.binary.version}
 
 Review comment:
   I double checked but all the other connectors are also in
   
   org.apache.flink
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services