Repository: beam Updated Branches: refs/heads/master 8be1dacab -> aa899e4ce
Fix erroneous use of .expand() in KafkaIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02591269 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02591269 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02591269 Branch: refs/heads/master Commit: 0259126920058ffcd08a235fee68f7cfd3d6ffe4 Parents: 8be1dac Author: Eugene Kirpichov <kirpic...@google.com> Authored: Thu Apr 20 15:07:58 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Thu Apr 20 16:18:14 2017 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/02591269/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 47657bb..fbd96eb 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -561,8 +561,8 @@ public class KafkaIO { @Override public PCollection<KV<K, V>> expand(PBegin begin) { - return read - .expand(begin) + return begin + .apply(read) .apply("Remove Kafka Metadata", ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() { @ProcessElement