[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232700234 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +/** + * This class defines a bound based on messages received or time since last received message. + * Using start(SourceFunction) starts the bound. Everytime a message is received the sourceFunction should call receivedMessage(). + * When the bound is reached, the sourcefunction gets closed by calling sourceFunction.close() + * See {@link BoundedPubSubSource}. + * + * @param type of message that is received by the SourceFunction. + */ +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232699534 ## File path: flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSourceTest.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.Test; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +/** + * Tests for {@link BoundedPubSubSource}. + */ +public class BoundedPubSubSourceTest { + private final Bound bound = mock(Bound.class); + private final SubscriberWrapper subscriberWrapper = mock(SubscriberWrapper.class); + private final SourceFunction.SourceContext sourceContext = mock(SourceFunction.SourceContext.class); + private final AckReplyConsumer ackReplyConsumer = mock(AckReplyConsumer.class); + private final DeserializationSchema deserializationSchema = mock(DeserializationSchema.class); + private final MetricGroup metricGroup = mock(MetricGroup.class); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232697684 ## File path: flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/BoundTest.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +/** + * Test for {@link Bound}. + */ +public class BoundTest { + private SourceFunction sourceFunction = mock(SourceFunction.class); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232690036 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/SerializableCredentialsProvider.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub.common; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.auth.Credentials; + +import java.io.IOException; +import java.io.Serializable; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * Wrapper class for CredentialsProvider to make it Serializable. This can be used to pass on Credentials to SourceFunctions + */ +public class SerializableCredentialsProvider implements CredentialsProvider, Serializable { + private final Credentials credentials; Review comment: All `Credentials` implementations are in fact serializable: public abstract class Credentials implements Serializable { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232687817 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +/** + * This class defines a bound based on messages received or time since last received message. + * Using start(SourceFunction) starts the bound. Everytime a message is received the sourceFunction should call receivedMessage(). + * When the bound is reached, the sourcefunction gets closed by calling sourceFunction.close() + * See {@link BoundedPubSubSource}. + * + * @param type of message that is received by the SourceFunction. + */ +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232626013 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -62,6 +66,17 @@ under the License. ${project.version} + + org.apache.flink + flink-connector-pubsub_${scala.binary.version} Review comment: I double checked but all the other connectors are also in org.apache.flink (So I'm not going to change this one) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232681463 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSink.class); + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { + } + + void setSerializableCredentialsProvider(SerializableCredentialsProvider serializableCredentialsProvider) { + this.serializableCredentialsProvider = serializableCredentialsProvider; + } + + void setSerializationSchema(SerializationSchema serializationSchema) { + this.serializationSchema = serializationSchema; + } + + void setProjectName(String projectName) { + this.projectName = projectName; + } + + void setTopicName(String topicName) { + this.topicName = topicName; + } + + /** +* Set the custom hostname/port combination of PubSub. +* The ONLY reason to use this is during tests with the emulator provided by Google. +* +* @param hostAndPort The combination of hostname and port to connect to ("hostname:1234") +*/ + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + void initialize() throws IOException { + if (serializableCredentialsProvider == null) { + serializableCredentialsProvider = SerializableCredentialsProvider.credentialsProviderFromEnvironmentVariables(); + } + if (serializationSchema == null) { + throw new IllegalArgumentException("The serializationSchema has not been specified."); + } + if (projectName == null) { + throw new IllegalArgumentException("The projectName has not been specified."); + } + if (topicName == null) { + throw new IllegalArgumentException("The topicName has not been specified."); + } + } + + + private transient ManagedChannel managedChannel = null; + private transient TransportChannel channel = null; + + @Override + public void open(Configuration configuration) throws Exception { + Publisher.Builder builder = Publisher + .newBuilder(ProjectTopicName.of(projectName, topicName)) +
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232637001 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapper.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.core.ApiService; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Wrapper class around a PubSub {@link Subscriber}. + * This class makes it easier to connect to a Non Google PubSub service such as a local PubSub emulator or docker container. + */ +class SubscriberWrapper implements Serializable, MessageReceiver { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636943 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636872 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/common/PubSubSubscriberFactory.java ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub.common; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import java.io.Serializable; + +/** + * A factory class to create a {@link Subscriber}. + * This allows for customized Subscribers with for instance tweaked configurations Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636963 ## File path: flink-connectors/flink-connector-pubsub/pom.xml ## @@ -0,0 +1,103 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.7-SNAPSHOT + .. + + + flink-connector-pubsub_${scala.binary.version} + flink-connector-pubsub + + jar + + + + + + com.google.cloud + google-cloud-bom + 0.53.0-alpha Review comment: That was 'current' when we wrote it. I just now updated it to 0.70 ... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636831 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSource.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them as soon as they have been received. Review comment: The MultipleIdsMessageAcknowledgingSourceBase will buffer the message handles and will only ack them AFTER the checkpoint has been completed. If there is a crash before the checkpoint the message has not yet been acked and will be retried (after timeout). If there is a crash after the checkpoint the message is persisted as part of the checkpoint. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636540 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/DefaultPubSubSubscriberFactory.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; + +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { + private String hostAndPort; Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636556 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/DefaultPubSubSubscriberFactory.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; + +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { + private String hostAndPort; + + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + @Override + public Subscriber getSubscriber(CredentialsProvider credentialsProvider, ProjectSubscriptionName projectSubscriptionName, MessageReceiver messageReceiver) { + FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .build(); + Subscriber.Builder builder = Subscriber + .newBuilder(ProjectSubscriptionName.of(projectSubscriptionName.getProject(), projectSubscriptionName.getSubscription()), messageReceiver) + .setFlowControlSettings(flowControlSettings) + .setCredentialsProvider(credentialsProvider); + + if (hostAndPort != null) { + ManagedChannel managedChannel = ManagedChannelBuilder + .forTarget(hostAndPort) + .usePlaintext() // This is 'Ok' because this is ONLY used for testing. Review comment: Yes, using the host and port is something that is in practice only used in testing scenarios. That is why I put the comment on the "don't do any encryption" method. I'll see if I can find a more suitable spot. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636463 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/DefaultPubSubSubscriberFactory.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; + +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory { + private String hostAndPort; + + void withHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + @Override + public Subscriber getSubscriber(CredentialsProvider credentialsProvider, ProjectSubscriptionName projectSubscriptionName, MessageReceiver messageReceiver) { + FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) Review comment: These are not empty lines. This is yet another indentation problem. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636422 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/BoundedPubSubSource.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.java.tuple.Tuple2; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.pubsub.v1.PubsubMessage; + +import java.io.IOException; + +/** + * A bounded PubSub Source, similar to {@link PubSubSource} but this will stop at some point. + * For example after a period of being idle or and after n amount of messages have been received. + */ +public class BoundedPubSubSource extends PubSubSource { Review comment: We chose to split these two to keep as much of the "testing code" out of the normal runtime and thus avoid polluting it. Only the host/port thing remains (which was your other comment about this). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636122 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636183 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +DeserializationSchema deserializationSchema = (...); +SourceFunction pubsubSource = PubSubSource.newBuilder() Review comment: Yes This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636206 ## File path: flink-connectors/flink-connector-pubsub/pom.xml ## @@ -0,0 +1,103 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.7-SNAPSHOT + .. + + + flink-connector-pubsub_${scala.binary.version} + flink-connector-pubsub + + jar + + + + + + com.google.cloud + google-cloud-bom + 0.53.0-alpha + pom + import + + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + provided + + + + com.google.cloud + google-cloud-pubsub + + + + + com.google.guava + guava-jdk5 + + + provided + + + + org.slf4j + slf4j-api Review comment: Yes, that makes it cleaner. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636055 ## File path: flink-connectors/flink-connector-pubsub/src/test/java/org/apache/flink/streaming/connectors/pubsub/SubscriberWrapperTest.java ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.core.ApiService; +import com.google.api.gax.core.CredentialsProvider; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.apache.flink.api.java.ClosureCleaner.ensureSerializable; +import static org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider.withoutCredentials; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link SubscriberWrapper}. + */ +@RunWith(MockitoJUnitRunner.class) +public class SubscriberWrapperTest { + @Mock + private PubSubSubscriberFactory pubSubSubscriberFactory; + + @Mock + private Subscriber subscriber; + + @Mock + private ApiService apiService; + + private PubsubMessage pubsubMessage = pubSubMessage(); + + @Mock + private AckReplyConsumer ackReplyConsumer; + + private SubscriberWrapper subscriberWrapper; + + @Before + public void setup() throws Exception { + when(pubSubSubscriberFactory.getSubscriber(any(), any(), any())).thenReturn(subscriber); + subscriberWrapper = new SubscriberWrapper(withoutCredentials(), ProjectSubscriptionName.of("projectId", "subscriptionId"), pubSubSubscriberFactory); + } + + @Test + public void testSerializedSubscriberBuilder() throws Exception { + SubscriberWrapper subscriberWrapper = new SubscriberWrapper(withoutCredentials(), ProjectSubscriptionName.of("projectId", "subscriptionId"), SubscriberWrapperTest::subscriberFactory); + ensureSerializable(subscriberWrapper); + } + + @Test + public void testInitialisation() { + SerializableCredentialsProvider credentialsProvider = withoutCredentials(); + ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of("projectId", "subscriptionId"); + SubscriberWrapper subscriberWrapper = new SubscriberWrapper(credentialsProvider, projectSubscriptionName, pubSubSubscriberFactory); + + subscriberWrapper.initialize(); + verify(pubSubSubscriberFactory, times(1)).getSubscriber(credentialsProvider, projectSubscriptionName, subscriberWrapper); + } + + @Test + public void testStart() { + when(subscriber.startAsync()).thenReturn(apiService); + subscriberWrapper.initialize(); + + subscriberWrapper.start(); + verify(apiService, times(1)).awaitRunning(); + assertThat(subscriberWrapper.amountOfMessagesInBuffer(), is(0)); + } + + @Test + public void testStopWithoutInitialize() { + subscriberWrapper.stop(); + verifyZeroInteractions(subscriber); + } + + @Test + public void
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636179 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +DeserializationSchema deserializationSchema = (...); +SourceFunction pubsubSource = PubSubSource.newBuilder() + .withDeserializationSchema(deserializationSchema) + .withProjectSubscriptionName("google-project-name", "pubsub-subscription") + .build(); + +streamExecEnv.addSource(pubsubSource); +{% endhighlight %} + + + + PubSub Sink + +The connector provides a Sink for writing data to PubSub. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +This builder works in a similar way to the PubSubSource. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +SerializationSchema serializationSchema = (...); +SinkFunction pubsubSink = PubSubSink.newBuilder() + .withSerializationSchema(serializationSchema) + .withTopicName("pubsub-topic-name") + .withProjectName("google-project-name") + .build() + +streamExecEnv.addSink(pubsubSink); +{% endhighlight %} + + + + Google Credentials + +Google uses [Credentials](https://cloud.google.com/docs/authentication/production) to authenticate and authorize applications so that they can use Google cloud resources such as PubSub. Both builders allow several ways to provide these credentials. + +By default the connectors will look for an environment variable: [GOOGLE_APPLICATION_CREDENTIALS](https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually) which should point to a file containing the credentials. + +It is also possible to provide a Credentials object directly. For instance if you read the Credentials yourself from an external system. In this case you can use `PubSubSource.newBuilder().withCredentials(...)` Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636196 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a google project and pubsub subscription and a way to deserialize the PubSubMessages. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +DeserializationSchema deserializationSchema = (...); +SourceFunction pubsubSource = PubSubSource.newBuilder() + .withDeserializationSchema(deserializationSchema) + .withProjectSubscriptionName("google-project-name", "pubsub-subscription") + .build(); + +streamExecEnv.addSource(pubsubSource); +{% endhighlight %} + + + + PubSub Sink + +The connector provides a Sink for writing data to PubSub. + +The class `PubSubSource(…)` has a builder to create PubSubsources. `PubSubSource.newBuilder()` + +This builder works in a similar way to the PubSubSource. +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +SerializationSchema serializationSchema = (...); +SinkFunction pubsubSink = PubSubSink.newBuilder() + .withSerializationSchema(serializationSchema) + .withTopicName("pubsub-topic-name") + .withProjectName("google-project-name") + .build() + +streamExecEnv.addSink(pubsubSink); +{% endhighlight %} + + + + Google Credentials + +Google uses [Credentials](https://cloud.google.com/docs/authentication/production) to authenticate and authorize applications so that they can use Google cloud resources such as PubSub. Both builders allow several ways to provide these credentials. + +By default the connectors will look for an environment variable: [GOOGLE_APPLICATION_CREDENTIALS](https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually) which should point to a file containing the credentials. + +It is also possible to provide a Credentials object directly. For instance if you read the Credentials yourself from an external system. In this case you can use `PubSubSource.newBuilder().withCredentials(...)` + + Integration testing + +When using integration tests you might not want to connect to PubSub directly but use a docker container to read and write to. This is possible by using `PubSubSource.newBuilder().withHostAndPort("localhost:1234")`. Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636212 ## File path: flink-connectors/flink-connector-pubsub/pom.xml ## @@ -0,0 +1,103 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-connectors + 1.7-SNAPSHOT + .. + + + flink-connector-pubsub_${scala.binary.version} + flink-connector-pubsub + + jar + + + + + + com.google.cloud + google-cloud-bom + 0.53.0-alpha + pom + import + + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + provided + + + + com.google.cloud + google-cloud-pubsub + + + + + com.google.guava + guava-jdk5 + + + provided + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + org.slf4j + slf4j-log4j12 Review comment: Yes, that makes it cleaner. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636084 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232635986 ## File path: flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/pom.xml ## @@ -0,0 +1,149 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-end-to-end-tests + 1.7-SNAPSHOT + .. + + + flink-connector-pubsub-emulator-tests + flink-connector-pubsub-emulator-tests + + jar + + Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636103 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,106 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-pubsub{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{site.baseurl}}/dev/linking.html) +for information about how to package the program with the libraries for +cluster execution. + + PubSub Source + +The connector provides a Source for reading data from Google PubSub to Apache Flink. PubSub has an Atleast-Once guarantee and as such. Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232636021 ## File path: flink-connectors/flink-connector-pubsub/src/test/resources/log4j-test.properties ## @@ -0,0 +1,24 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, testlogger Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232634494 ## File path: flink-end-to-end-tests/flink-connector-pubsub-emulator-tests/src/test/resources/log4j-test.properties ## @@ -0,0 +1,24 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, testlogger Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
nielsbasjes commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r232626013 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -62,6 +66,17 @@ under the License. ${project.version} + + org.apache.flink + flink-connector-pubsub_${scala.binary.version} Review comment: I double checked but all the other connectors are also in org.apache.flink This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services