support topicPartition in BeamKafkaTable
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/92b3a9ac Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/92b3a9ac Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/92b3a9ac Branch: refs/heads/tez-runner Commit: 92b3a9acf04450313c3c2340f6921bf433c2dc04 Parents: acfab89 Author: mingmxu <ming...@ebay.com> Authored: Fri Nov 10 19:51:37 2017 -0800 Committer: James Xu <xumingmi...@gmail.com> Committed: Mon Nov 13 14:41:29 2017 +0800 ---------------------------------------------------------------------- .../sql/meta/provider/kafka/BeamKafkaTable.java | 39 +++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/92b3a9ac/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java index 50f7496..8f663a3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -45,6 +46,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable { private String bootstrapServers; private List<String> topics; + private List<TopicPartition> topicPartitions; private Map<String, Object> configUpdates; protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) { @@ -58,6 +60,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab this.topics = topics; } + public BeamKafkaTable(BeamRecordSqlType beamSqlRowType, + List<TopicPartition> topicPartitions, String bootstrapServers) { + super(beamSqlRowType); + this.bootstrapServers = bootstrapServers; + this.topicPartitions = topicPartitions; + } + public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) { this.configUpdates = configUpdates; return this; @@ -76,15 +85,27 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab @Override public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline).apply("read", - KafkaIO.<byte[], byte[]>read() - .withBootstrapServers(bootstrapServers) - .withTopics(topics) - .updateConsumerProperties(configUpdates) - .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) - .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) - .withoutMetadata()) - .apply("in_format", getPTransformForInput()); + KafkaIO.Read<byte[], byte[]> kafkaRead = null; + if (topics != null) { + kafkaRead = KafkaIO.<byte[], byte[]>read() + .withBootstrapServers(bootstrapServers) + .withTopics(topics) + .updateConsumerProperties(configUpdates) + .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) + .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()); + } else if (topicPartitions != null) { + kafkaRead = KafkaIO.<byte[], byte[]>read() + .withBootstrapServers(bootstrapServers) + .withTopicPartitions(topicPartitions) + .updateConsumerProperties(configUpdates) + .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) + .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()); + } else { + throw new IllegalArgumentException("One of topics and topicPartitions must be configurated."); + } + + return PBegin.in(pipeline).apply("read", kafkaRead.withoutMetadata()) +.apply("in_format", getPTransformForInput()); } @Override