Hi Naveen, The error message is accurate: currently TextIO can only be applied to a bounded PCollection, but reading from KafkaIO produces an unbounded PCollection. If you are just testing things out, you can use .withMaxNumRecords() or .withMaxReadTime() to get a bounded PCollection from KafkaIO. If you have a real use case of writing unbounded data to a textual sink, then you'll need to implement the write via an idempotent writing transform of your own devising. One example of such a transform is used in WindowedWordCount ( https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java#L232 ).
Kenn On Tue, Jan 17, 2017 at 2:16 PM, Madhire, Naveen < [email protected]> wrote: > Hi Kenn, > > > > I looked at the code base and figured out that withoutMetadata is method > of TypedRead class. So, I modified my code like below to avoid the > compilation error. > > > > I also looked at the KafkaIO test cases > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L596> > and all the test cases actually use TypedRead class. My be we should update > the example present here > <https://beam.apache.org/documentation/sdks/javadoc/0.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.html> > > > > > > > > > > p.apply(((KafkaIO.TypedRead<String, String>)KafkaIO.*read*() > .withTopics(ImmutableList.*of*(*"Topic"*)) > .withKeyCoder(StringUtf8Coder.*of*()) > .withValueCoder(StringUtf8Coder.*of*()) > .updateConsumerProperties((Map) Maps.*fromProperties*( > properties))) > .withoutMetadata()) > .apply(*"Transform "*, MapElements.*via*(*new > *SimpleFunction<KV<String, > String>, String>() { > @Override > *public *String apply(KV<String, String> input) { > *return *input.getKey() + *" " *+ input.getValue(); > } > })) > .apply(TextIO.Write.*to*(*"Location"*)); > > > > > > When I run now, I see a runtime error at the last statement, > *TextIO.Write* > > > > Is there anything incorrect with the TextIO code? > > > > DynamicallyReshardedWrite can only be applied to a Bounded PCollection > > at org.apache.beam.runners.direct.repackaged.com.google. > common.base.Preconditions.checkArgument(Preconditions.java:146) > > at org.apache.beam.runners.direct.WriteWithShardingFactory$ > DynamicallyReshardedWrite.expand(WriteWithShardingFactory.java:70) > > at org.apache.beam.runners.direct.WriteWithShardingFactory$ > DynamicallyReshardedWrite.expand(WriteWithShardingFactory.java:61) > > at org.apache.beam.sdk.runners.PipelineRunner.apply( > PipelineRunner.java:76) > > at org.apache.beam.runners.direct.DirectRunner.apply( > DirectRunner.java:295) > > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java: > 385) > > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java: > 315) > > at org.apache.beam.runners.direct.DirectRunner.apply( > DirectRunner.java:291) > > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java: > 385) > > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java: > 315) > > at org.apache.beam.sdk.values.PCollection.apply(PCollection. > java:167) > > at org.apache.beam.sdk.io.TextIO$Write$Bound.expand(TextIO. > java:757) > > at org.apache.beam.sdk.io.TextIO$Write$Bound.expand(TextIO. > java:527) > > at org.apache.beam.sdk.runners.PipelineRunner.apply( > PipelineRunner.java:76) > > at org.apache.beam.runners.direct.DirectRunner.apply( > DirectRunner.java:295) > > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java: > 385) > > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java: > 299) > > at org.apache.beam.sdk.values.PCollection.apply(PCollection. > java:154) > > > > > > Thanks, > > Naveen > > > > *From: *Kenneth Knowles <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Thursday, January 12, 2017 at 4:14 PM > > *To: *"[email protected]" <[email protected]> > *Subject: *Re: KafkaIO Example > > > > Hi Naveen, > > > > I have just successfully compiled the code from your most recent email so > I suspect the error lies elsewhere. > > > > Taking a wild guess, the error you are getting would be expected if a > transform were upcast to the rawtype PTransform, as its raw output type is > POutput. > > > > Kenn > > > > On Thu, Jan 12, 2017 at 12:55 PM, Madhire, Naveen < > [email protected]> wrote: > > I changed the code to see where the problem is with incompatible > parameters. > > > > I get the below error, is there an issue with *withoutMetadata*() method > since KV<K, V> doesn’t implements POutput ? > > > > *java: incompatible types: org.apache.beam.sdk.values.POutput cannot be > converted to > org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.values.KV<java.lang.String,java.lang.String>>* > > > > PCollection <KV<String, String>> kafkaRead = p.apply(KafkaIO.*read*() > .withTopics(ImmutableList.*of*(*"Topic"*)) > .withKeyCoder(StringUtf8Coder.*of*()) > .withValueCoder(StringUtf8Coder.*of*()) > .updateConsumerProperties((Map) Maps.*fromProperties*(properties)) > .withoutMetadata()); > > > kafkaRead > .apply(*"Transform "*, MapElements.*via*(*new > *SimpleFunction<KV<String, > String>, String>() { > @Override > *public *String apply(KV<String, String> input) { > *return *input.getKey() + *" " *+ input.getValue(); > } > })) > .apply(TextIO.Write.*to*(*"Location"*)); > > > > > > > > Thanks, > > Naveen > > > > *From: *"Madhire, Naveen" <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Thursday, January 12, 2017 at 11:55 AM > > > *To: *"[email protected]" <[email protected]> > *Subject: *Re: KafkaIO Example > > > > Yes, I’ve verified the kafka connectivity outside of Beam. I wanted to see > how it works if I read kafka data using Beam. > > > > I am changed the code to use the default direct runner and getting a > compilation error now, > > > > > > java: cannot find symbol > > symbol: method apply(java.lang.String,org.apache.beam.sdk.transforms. > MapElements<org.apache.beam.sdk.values.KV<java.lang. > String,java.lang.String>,java.lang.String>) > > location: interface org.apache.beam.sdk.values.POutput > > > > Am I missing any other dependency in Pom? Please let me know. > > > > Code: > > > > PipelineOptions options = PipelineOptionsFactory.*create*(); > > Pipeline p = Pipeline.*create*(options); > > > *//Read from Kafka Topic **try *{ > InputStream props = Resources.*getResource*(*"client.properties"* > ).openStream(); > Properties properties = *new *Properties(); > properties.load(props); > > > > > > p.apply(KafkaIO.*read*() > .withTopics(ImmutableList.*of*(*"Topic"*)) > .withKeyCoder(StringUtf8Coder.*of*()) > .withValueCoder(StringUtf8Coder.*of*()) > .updateConsumerProperties((Map) Maps.*fromProperties*(properties)) > .withoutMetadata()) > .apply(*"Transform "*, MapElements.*via*(*new > *SimpleFunction<KV<String, > String>, String>() { > @Override > *public *String apply(KV<String, String> input) { > *return *input.getKey() + *" " *+ input.getValue(); > } > })) > .apply(TextIO.Write.*to*(*"Location"*)); > > } > > > > I’ve the below dependencies included in my pom, > > > > > > *<!-- Apache Beam Dependencies -->* > > > > > > > *<dependency> <groupId>org.apache.beam</groupId> > <artifactId>beam-sdks-java-io-kafka</artifactId> > <version>0.4.0</version> </dependency> *<*dependency*> > <*groupId*>org.apache.beam</*groupId*> > <*artifactId*>beam-sdks-java-core</*artifactId*> > <*version*>0.4.0</*version*> > </*dependency*> > <*dependency*> > <*groupId*>org.apache.beam</*groupId*> > <*artifactId*>beam-runners-direct-java</*artifactId*> > <*version*>0.4.0</*version*> > </*dependency*> > <*dependency*> > <*groupId*>org.apache.beam</*groupId*> > <*artifactId*>beam-runners-core-java</*artifactId*> > <*version*>0.4.0</*version*> > </*dependency*> > > *<!—Business logic -->* > > <*dependency*> > <*groupId*>org.drools</*groupId*> > <*artifactId*>drools-core</*artifactId*> > <*version*>6.2.0.Final</*version*> > </*dependency*> > > > > Thanks, > > Naveen > > > > *From: *Lukasz Cwik <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Thursday, January 12, 2017 at 10:58 AM > *To: *"[email protected]" <[email protected]> > *Subject: *Re: KafkaIO Example > > > > I'm assuming you mean DirectRunner and not DataflowDirectRunner. > > > > On Wed, Jan 11, 2017 at 4:23 PM, Raghu Angadi <[email protected]> wrote: > > > > On Wed, Jan 11, 2017 at 1:56 PM, Madhire, Naveen < > [email protected]> wrote: > > I can confirm the authorization and authentication to Kafka is behaving > correctly, > > > > Did you confirm this within Beam/KafkIO context or outside? When I last > checked Kafka requires the the credentials file to be available on local > filesystem where the KafkaConsumer runs. All the workers need to have this > file. Shipping these files to workers depends on the runner/execution > environment. > > > > Can you try with DataflowDirectRunner? That would be simpler to debug. > > > > Raghu. > > > > > > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. > > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. > > > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >
