This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7495aa7  [BEAM-7029] Add KafkaIO.Read as an external transform
     new 3aaf39a  Merge pull request #8251: [BEAM-7029] Add KafkaIO.Read as an 
external transform
7495aa7 is described below

commit 7495aa719037445a17a5af1d503766f5c5781d74
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Mon Apr 8 16:16:28 2019 +0200

    [BEAM-7029] Add KafkaIO.Read as an external transform
    
    This adds KafkaIO.Read as an external transform and includes a Python
    wrapper (ReadFromkafka) for convenience. The transform only returns the
    key/value data for a Kafka topic. It does not include the meta data such as
    partition id, offset, or timestamp.
    
    By default, the data is returned as `KV<byte[], byte[]>`. Users can supply a
    Kafka Deserializer in ReadFromkafka such as LongDeserializer which will 
infer a
    different coder. Only a limited amount of Deserializers are supported.
    Alternatively, users can implement their own decoding in the target SDK.
---
 runners/flink/job-server/flink_job_server.gradle   |   2 +
 sdks/java/io/kafka/build.gradle                    |   2 +
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 127 +++++++++++++-
 .../beam/sdk/io/kafka/KafkaIOExternalTest.java     | 185 +++++++++++++++++++++
 .../apache_beam/io/external/generate_sequence.py   |   2 +-
 sdks/python/apache_beam/io/external/kafka.py       | 152 +++++++++++++++++
 .../runners/portability/flink_runner_test.py       |  25 ++-
 7 files changed, 492 insertions(+), 3 deletions(-)

diff --git a/runners/flink/job-server/flink_job_server.gradle 
b/runners/flink/job-server/flink_job_server.gradle
index ebed8e4..84140fc 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -84,6 +84,8 @@ dependencies {
   compile project(path: 
":beam-sdks-java-extensions-google-cloud-platform-core", configuration: 
"shadow")
   compile library.java.slf4j_simple
 //  TODO: Enable AWS and HDFS file system.
+  // For resolving external transform requests
+  compile project(path: ":beam-sdks-java-io-kafka", configuration: "shadow")
 }
 
 // NOTE: runShadow must be used in order to run the job server. The standard 
run
diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle
index 9a88160..cfb1a62 100644
--- a/sdks/java/io/kafka/build.gradle
+++ b/sdks/java/io/kafka/build.gradle
@@ -33,6 +33,8 @@ dependencies {
   shadow "org.springframework:spring-expression:4.3.18.RELEASE"
   testCompile project(path: ":beam-runners-direct-java", configuration: 
"shadow")
   testCompile project(path: ":beam-sdks-java-core", configuration: 
"shadowTest")
+  // For testing Cross-language transforms
+  testCompile project(path: ":beam-runners-core-construction-java", 
configuration: "shadow")
   testCompile library.java.hamcrest_core
   testCompile library.java.hamcrest_library
   testCompile library.java.junit
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 7feee9c..d542bcc 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -21,12 +21,15 @@ import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,17 +38,22 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
 import org.apache.beam.sdk.io.Read.Unbounded;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -57,6 +65,7 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
@@ -351,7 +360,8 @@ public class KafkaIO {
     abstract Builder<K, V> toBuilder();
 
     @AutoValue.Builder
-    abstract static class Builder<K, V> {
+    abstract static class Builder<K, V>
+        implements ExternalTransformBuilder<External.Configuration, PBegin, 
PCollection<KV<K, V>>> {
       abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
 
       abstract Builder<K, V> setTopics(List<String> topics);
@@ -386,6 +396,121 @@ public class KafkaIO {
       abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> 
offsetConsumerConfig);
 
       abstract Read<K, V> build();
+
+      @Override
+      public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
+          External.Configuration config) {
+        ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
+        for (byte[] topic : config.topics) {
+          listBuilder.add(utf8String(topic));
+        }
+        setTopics(listBuilder.build());
+
+        String keyDeserializerClassName = utf8String(config.keyDeserializer);
+        Class keyDeserializer = resolveClass(keyDeserializerClassName);
+        setKeyDeserializer(keyDeserializer);
+        setKeyCoder(resolveCoder(keyDeserializer));
+
+        String valueDeserializerClassName = 
utf8String(config.valueDeserializer);
+        Class valueDeserializer = resolveClass(valueDeserializerClassName);
+        setValueDeserializer(valueDeserializer);
+        setValueCoder(resolveCoder(valueDeserializer));
+
+        Map<String, Object> consumerConfig = new HashMap<>();
+        for (KV<byte[], byte[]> kv : config.consumerConfig) {
+          String key = utf8String(kv.getKey());
+          String value = utf8String(kv.getValue());
+          consumerConfig.put(key, value);
+        }
+        // Key and Value Deserializers always have to be in the config.
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getName());
+        consumerConfig.put(
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getName());
+        setConsumerConfig(consumerConfig);
+
+        // Set required defaults
+        setTopicPartitions(Collections.emptyList());
+        setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN);
+        setMaxNumRecords(Long.MAX_VALUE);
+        setCommitOffsetsInFinalizeEnabled(false);
+        setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
+        // We do not include Metadata until we can encode KafkaRecords 
cross-language
+        return build().withoutMetadata();
+      }
+
+      private static Coder resolveCoder(Class deserializer) {
+        for (Method method : deserializer.getDeclaredMethods()) {
+          if (method.getName().equals("deserialize")) {
+            Class<?> returnType = method.getReturnType();
+            if (returnType.equals(Object.class)) {
+              continue;
+            }
+            if (returnType.equals(byte[].class)) {
+              return ByteArrayCoder.of();
+            } else if (returnType.equals(Integer.class)) {
+              return VarIntCoder.of();
+            } else if (returnType.equals(Long.class)) {
+              return VarLongCoder.of();
+            } else {
+              throw new RuntimeException("Couldn't infer Coder from " + 
deserializer);
+            }
+          }
+        }
+        throw new RuntimeException("Couldn't resolve coder for Deserializer: " 
+ deserializer);
+      }
+
+      private static Class resolveClass(String className) {
+        try {
+          return Class.forName(className);
+        } catch (ClassNotFoundException e) {
+          throw new RuntimeException("Could not find deserializer class: " + 
className);
+        }
+      }
+
+      private static String utf8String(byte[] bytes) {
+        return new String(bytes, Charsets.UTF_8);
+      }
+    }
+
+    /**
+     * Exposes {@link KafkaIO.TypedWithoutMetadata} as an external transform 
for cross-language
+     * usage.
+     */
+    @AutoService(ExternalTransformRegistrar.class)
+    public static class External implements ExternalTransformRegistrar {
+
+      public static final String URN = "beam:external:java:kafka:read:v1";
+
+      @Override
+      public Map<String, Class<? extends ExternalTransformBuilder>> 
knownBuilders() {
+        return ImmutableMap.of(URN, AutoValue_KafkaIO_Read.Builder.class);
+      }
+
+      /** Parameters class to expose the transform to an external SDK. */
+      public static class Configuration {
+
+        // All byte arrays are UTF-8 encoded strings
+        private Iterable<KV<byte[], byte[]>> consumerConfig;
+        private Iterable<byte[]> topics;
+        private byte[] keyDeserializer;
+        private byte[] valueDeserializer;
+
+        public void setConsumerConfig(Iterable<KV<byte[], byte[]>> 
consumerConfig) {
+          this.consumerConfig = consumerConfig;
+        }
+
+        public void setTopics(Iterable<byte[]> topics) {
+          this.topics = topics;
+        }
+
+        public void setKeyDeserializer(byte[] keyDeserializer) {
+          this.keyDeserializer = keyDeserializer;
+        }
+
+        public void setValueDeserializer(byte[] valueDeserializer) {
+          this.valueDeserializer = valueDeserializer;
+        }
+      }
     }
 
     /** Sets the bootstrap servers for the Kafka consumer. */
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
new file mode 100644
index 0000000..ecf68ea
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.model.expansion.v1.ExpansionApi;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.ReadTranslation;
+import org.apache.beam.runners.core.construction.expansion.ExpansionService;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+/** Tests for building {@link KafkaIO} externally via the ExpansionService. */
+public class KafkaIOExternalTest {
+  @Test
+  public void testConstructKafkaIO() throws Exception {
+    List<String> topics = ImmutableList.of("topic1", "topic2");
+    String keyDeserializer = 
"org.apache.kafka.common.serialization.ByteArrayDeserializer";
+    String valueDeserializer = 
"org.apache.kafka.common.serialization.LongDeserializer";
+    ImmutableMap<String, String> consumerConfig =
+        ImmutableMap.<String, String>builder()
+            .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"server1:port,server2:port")
+            .put("key2", "value2")
+            .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
+            .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer)
+            .build();
+
+    ExternalTransforms.ExternalConfigurationPayload payload =
+        ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+            .putConfiguration(
+                "topics",
+                ExternalTransforms.ConfigValue.newBuilder()
+                    .addCoderUrn("beam:coder:iterable:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
+                    .setPayload(ByteString.copyFrom(listAsBytes(topics)))
+                    .build())
+            .putConfiguration(
+                "consumer_config",
+                ExternalTransforms.ConfigValue.newBuilder()
+                    .addCoderUrn("beam:coder:iterable:v1")
+                    .addCoderUrn("beam:coder:kv:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
+                    
.setPayload(ByteString.copyFrom(mapAsBytes(consumerConfig)))
+                    .build())
+            .putConfiguration(
+                "key_deserializer",
+                ExternalTransforms.ConfigValue.newBuilder()
+                    .addCoderUrn("beam:coder:bytes:v1")
+                    
.setPayload(ByteString.copyFrom(encodeString(keyDeserializer)))
+                    .build())
+            .putConfiguration(
+                "value_deserializer",
+                ExternalTransforms.ConfigValue.newBuilder()
+                    .addCoderUrn("beam:coder:bytes:v1")
+                    
.setPayload(ByteString.copyFrom(encodeString(valueDeserializer)))
+                    .build())
+            .build();
+
+    RunnerApi.Components defaultInstance = 
RunnerApi.Components.getDefaultInstance();
+    ExpansionApi.ExpansionRequest request =
+        ExpansionApi.ExpansionRequest.newBuilder()
+            .setComponents(defaultInstance)
+            .setTransform(
+                RunnerApi.PTransform.newBuilder()
+                    .setUniqueName("test")
+                    .setSpec(
+                        RunnerApi.FunctionSpec.newBuilder()
+                            .setUrn("beam:external:java:kafka:read:v1")
+                            .setPayload(payload.toByteString())))
+            .setNamespace("test_namespace")
+            .build();
+
+    ExpansionService expansionService = new ExpansionService();
+    TestStreamObserver<ExpansionApi.ExpansionResponse> observer = new 
TestStreamObserver<>();
+    expansionService.expand(request, observer);
+
+    ExpansionApi.ExpansionResponse result = observer.result;
+    RunnerApi.PTransform transform = result.getTransform();
+    assertThat(
+        transform.getSubtransformsList(),
+        Matchers.contains(
+            "test_namespacetest/KafkaIO.Read", "test_namespacetest/Remove 
Kafka Metadata"));
+    assertThat(transform.getInputsCount(), Matchers.is(0));
+    assertThat(transform.getOutputsCount(), Matchers.is(1));
+
+    RunnerApi.PTransform kafkaComposite =
+        
result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
+    RunnerApi.PTransform kafkaRead =
+        
result.getComponents().getTransformsOrThrow(kafkaComposite.getSubtransforms(0));
+    RunnerApi.ReadPayload readPayload =
+        RunnerApi.ReadPayload.parseFrom(kafkaRead.getSpec().getPayload());
+    KafkaUnboundedSource source =
+        (KafkaUnboundedSource) 
ReadTranslation.unboundedSourceFromProto(readPayload);
+    KafkaIO.Read spec = source.getSpec();
+
+    assertThat(spec.getConsumerConfig(), Matchers.is(consumerConfig));
+    assertThat(spec.getTopics(), Matchers.is(topics));
+    assertThat(spec.getKeyDeserializer().getName(), 
Matchers.is(keyDeserializer));
+    assertThat(spec.getValueDeserializer().getName(), 
Matchers.is(valueDeserializer));
+  }
+
+  private static byte[] listAsBytes(List<String> stringList) throws 
IOException {
+    IterableCoder<byte[]> coder = IterableCoder.of(ByteArrayCoder.of());
+    List<byte[]> bytesList =
+        
stringList.stream().map(KafkaIOExternalTest::rawBytes).collect(Collectors.toList());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    coder.encode(bytesList, baos);
+    return baos.toByteArray();
+  }
+
+  private static byte[] mapAsBytes(Map<String, String> stringMap) throws 
IOException {
+    IterableCoder<KV<byte[], byte[]>> coder =
+        IterableCoder.of(KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of()));
+    List<KV<byte[], byte[]>> bytesList =
+        stringMap.entrySet().stream()
+            .map(kv -> KV.of(rawBytes(kv.getKey()), rawBytes(kv.getValue())))
+            .collect(Collectors.toList());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    coder.encode(bytesList, baos);
+    return baos.toByteArray();
+  }
+
+  private static byte[] encodeString(String str) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ByteArrayCoder.of().encode(rawBytes(str), baos);
+    return baos.toByteArray();
+  }
+
+  private static byte[] rawBytes(String str) {
+    Preconditions.checkNotNull(str, "String must not be null.");
+    return str.getBytes(Charsets.UTF_8);
+  }
+
+  private static class TestStreamObserver<T> implements StreamObserver<T> {
+
+    private T result;
+
+    @Override
+    public void onNext(T t) {
+      result = t;
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+      throw new RuntimeException("Should not happen", throwable);
+    }
+
+    @Override
+    public void onCompleted() {}
+  }
+}
diff --git a/sdks/python/apache_beam/io/external/generate_sequence.py 
b/sdks/python/apache_beam/io/external/generate_sequence.py
index 4ad6b46..4c9d0b8 100644
--- a/sdks/python/apache_beam/io/external/generate_sequence.py
+++ b/sdks/python/apache_beam/io/external/generate_sequence.py
@@ -32,7 +32,7 @@ class GenerateSequence(ptransform.PTransform):
 
   def __init__(self, start, stop=None,
                elements_per_period=None, max_read_time=None,
-               expansion_service=None):
+               expansion_service='localhost:8097'):
     super(GenerateSequence, self).__init__()
     self._urn = 'beam:external:java:generate_sequence:v1'
     self.start = start
diff --git a/sdks/python/apache_beam/io/external/kafka.py 
b/sdks/python/apache_beam/io/external/kafka.py
new file mode 100644
index 0000000..b5b4943
--- /dev/null
+++ b/sdks/python/apache_beam/io/external/kafka.py
@@ -0,0 +1,152 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+  PTransforms for supporting Kafka in Python pipelines. These transforms do not
+  run a Kafka client in Python. Instead, they expand to ExternalTransform and
+  utilize the Java SDK's Kafka IO. The expansion service will insert Kafka Java
+  transforms before the pipeline is executed. Users currently have to provide
+  the address of the Java expansion service. Flink Users can use the built-in
+  expansion service of the Flink Runner's job server.
+"""
+
+from __future__ import absolute_import
+
+from apache_beam import ExternalTransform
+from apache_beam import pvalue
+from apache_beam.coders import BytesCoder
+from apache_beam.coders import IterableCoder
+from apache_beam.coders import TupleCoder
+from apache_beam.coders.coders import LengthPrefixCoder
+from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
+from apache_beam.portability.api.external_transforms_pb2 import 
ExternalConfigurationPayload
+from apache_beam.transforms import ptransform
+
+
+class ReadFromKafka(ptransform.PTransform):
+  """
+    An external PTransform which reads from Kafka and returns a KV pair for
+    each item in the specified Kafka topics. If no Kafka Deserializer for
+    key/value is provided, then the data will be returned as a raw byte array.
+
+    Note: To use this transform, you need to start the Java expansion service.
+    Please refer to the portability documentation on how to do that. The
+    expansion service address has to be provided when instantiating this
+    transform. During pipeline translation this transform will be replaced by
+    the Java SDK's KafkaIO.
+
+    If you start Flink's job server, the expansion service will be started on
+    port 8097. This is also the configured default for this transform. For a
+    different address, please set the expansion_service parameter.
+
+    For more information see:
+    - https://beam.apache.org/documentation/runners/flink/
+    - https://beam.apache.org/roadmap/portability/
+
+    Note: Runners need to support translating Read operations in order to use
+    this source. At the moment only the Flink Runner supports this.
+  """
+
+  # Returns the key/value data as raw byte arrays
+  byte_array_deserializer = 'org.apache.kafka.common.serialization.' \
+                            'ByteArrayDeserializer'
+
+  def __init__(self, consumer_config,
+               topics,
+               key_deserializer=byte_array_deserializer,
+               value_deserializer=byte_array_deserializer,
+               expansion_service='localhost:8097'):
+    """
+    Initializes a read operation from Kafka.
+
+    :param consumer_config: A dictionary containing the consumer configuration.
+    :param topics: A list of topic strings.
+    :param key_deserializer: A fully-qualified Java class name of a Kafka
+                             Deserializer for the topic's key, e.g.
+                             'org.apache.kafka.common.
+                             serialization.LongDeserializer'.
+                             Default: 'org.apache.kafka.common.
+                             serialization.ByteArrayDeserializer'.
+    :param value_deserializer: A fully-qualified Java class name of a Kafka
+                               Deserializer for the topic's value, e.g.
+                               'org.apache.kafka.common.
+                               serialization.LongDeserializer'.
+                               Default: 'org.apache.kafka.common.
+                               serialization.ByteArrayDeserializer'.
+    :param expansion_service: The address (host:port) of the ExpansionService.
+    """
+    super(ReadFromKafka, self).__init__()
+    self._urn = 'beam:external:java:kafka:read:v1'
+    self.consumer_config = consumer_config
+    self.topics = topics
+    self.key_deserializer = key_deserializer
+    self.value_deserializer = value_deserializer
+    self.expansion_service = expansion_service
+
+  def expand(self, pbegin):
+    if not isinstance(pbegin, pvalue.PBegin):
+      raise Exception("ReadFromKafka must be a root transform")
+
+    args = {
+        'consumer_config':
+            ReadFromKafka._encode_map(self.consumer_config),
+        'topics':
+            ReadFromKafka._encode_list(self.topics),
+        'key_deserializer':
+            ReadFromKafka._encode_str(self.key_deserializer),
+        'value_deserializer':
+            ReadFromKafka._encode_str(self.value_deserializer),
+    }
+
+    payload = ExternalConfigurationPayload(configuration=args)
+    return pbegin.apply(
+        ExternalTransform(
+            self._urn,
+            payload.SerializeToString(),
+            self.expansion_service))
+
+  @staticmethod
+  def _encode_map(dict_obj):
+    kv_list = [(key.encode('utf-8'), val.encode('utf-8'))
+               for key, val in dict_obj.items()]
+    coder = IterableCoder(TupleCoder(
+        [LengthPrefixCoder(BytesCoder()), LengthPrefixCoder(BytesCoder())]))
+    coder_urns = ['beam:coder:iterable:v1',
+                  'beam:coder:kv:v1',
+                  'beam:coder:bytes:v1',
+                  'beam:coder:bytes:v1']
+    return ConfigValue(
+        coder_urn=coder_urns,
+        payload=coder.encode(kv_list))
+
+  @staticmethod
+  def _encode_list(list_obj):
+    encoded_list = [val.encode('utf-8') for val in list_obj]
+    coder = IterableCoder(LengthPrefixCoder(BytesCoder()))
+    coder_urns = ['beam:coder:iterable:v1',
+                  'beam:coder:bytes:v1']
+    return ConfigValue(
+        coder_urn=coder_urns,
+        payload=coder.encode(encoded_list))
+
+  @staticmethod
+  def _encode_str(str_obj):
+    encoded_str = str_obj.encode('utf-8')
+    coder = LengthPrefixCoder(BytesCoder())
+    coder_urns = ['beam:coder:bytes:v1']
+    return ConfigValue(
+        coder_urn=coder_urns,
+        payload=coder.encode(encoded_str))
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py 
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index d67b5fb..13dd7bd 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -29,6 +29,7 @@ from tempfile import mkdtemp
 
 import apache_beam as beam
 from apache_beam.io.external.generate_sequence import GenerateSequence
+from apache_beam.io.external.kafka import ReadFromKafka
 from apache_beam.metrics import Metrics
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PortableOptions
@@ -157,7 +158,7 @@ if __name__ == '__main__':
     def test_no_subtransform_composite(self):
       raise unittest.SkipTest("BEAM-4781")
 
-    def test_external_transform(self):
+    def test_external_transforms(self):
       options = self.create_options()
       options._all_options['parallelism'] = 1
       options._all_options['streaming'] = True
@@ -172,6 +173,28 @@ if __name__ == '__main__':
 
         assert_that(res, equal_to([i for i in range(1, 10)]))
 
+      # We expect to fail here because we do not have a Kafka cluster handy.
+      # Nevertheless, we check that the transform is expanded by the
+      # ExpansionService and that the pipeline fails during execution.
+      with self.assertRaises(Exception) as ctx:
+        with self.create_pipeline() as p:
+          # pylint: disable=expression-not-assigned
+          (p
+           | ReadFromKafka(consumer_config={'bootstrap.servers':
+                                            'notvalid1:7777, notvalid2:3531'},
+                           topics=['topic1', 'topic2'],
+                           key_deserializer='org.apache.kafka.'
+                                            'common.serialization.'
+                                            'ByteArrayDeserializer',
+                           value_deserializer='org.apache.kafka.'
+                                              'common.serialization.'
+                                              'LongDeserializer',
+                           expansion_service=expansion_address))
+      self.assertTrue('No resolvable bootstrap urls given in bootstrap.servers'
+                      in str(ctx.exception),
+                      'Expected to fail due to invalid bootstrap.servers, but '
+                      'failed due to:\n%s' % str(ctx.exception))
+
     def test_flattened_side_input(self):
       # Blocked on support for transcoding
       # https://jira.apache.org/jira/browse/BEAM-6523

Reply via email to