Hi Naveen,

The error message is accurate: currently TextIO can only be applied to a
bounded PCollection, but reading from KafkaIO produces an unbounded
PCollection. If you are just testing things out, you can use
.withMaxNumRecords() or .withMaxReadTime() to get a bounded PCollection
from KafkaIO. If you have a real use case of writing unbounded data to a
textual sink, then you'll need to implement the write via an idempotent
writing transform of your own devising. One example of such a transform is
used in WindowedWordCount (
https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java#L232
).

Kenn

On Tue, Jan 17, 2017 at 2:16 PM, Madhire, Naveen <
[email protected]> wrote:

> Hi Kenn,
>
>
>
> I looked at the code base and figured out that withoutMetadata is method
> of TypedRead class. So, I modified my code like below to avoid the
> compilation error.
>
>
>
> I also looked at the KafkaIO test cases
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L596>
> and all the test cases actually use TypedRead class. My be we should update
> the example present here
> <https://beam.apache.org/documentation/sdks/javadoc/0.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.html>
>
>
>
>
>
>
>
>
>
> p.apply(((KafkaIO.TypedRead<String, String>)KafkaIO.*read*()
>         .withTopics(ImmutableList.*of*(*"Topic"*))
>         .withKeyCoder(StringUtf8Coder.*of*())
>         .withValueCoder(StringUtf8Coder.*of*())
>         .updateConsumerProperties((Map) Maps.*fromProperties*(
> properties)))
>         .withoutMetadata())
>         .apply(*"Transform "*, MapElements.*via*(*new 
> *SimpleFunction<KV<String,
> String>, String>() {
>             @Override
>             *public *String apply(KV<String, String> input) {
>                 *return *input.getKey() + *"  " *+ input.getValue();
>             }
>         }))
>         .apply(TextIO.Write.*to*(*"Location"*));
>
>
>
>
>
> When I run now, I see a runtime error at the last statement,
> *TextIO.Write*
>
>
>
> Is there anything incorrect with the TextIO code?
>
>
>
> DynamicallyReshardedWrite can only be applied to a Bounded PCollection
>
>             at org.apache.beam.runners.direct.repackaged.com.google.
> common.base.Preconditions.checkArgument(Preconditions.java:146)
>
>             at org.apache.beam.runners.direct.WriteWithShardingFactory$
> DynamicallyReshardedWrite.expand(WriteWithShardingFactory.java:70)
>
>             at org.apache.beam.runners.direct.WriteWithShardingFactory$
> DynamicallyReshardedWrite.expand(WriteWithShardingFactory.java:61)
>
>             at org.apache.beam.sdk.runners.PipelineRunner.apply(
> PipelineRunner.java:76)
>
>             at org.apache.beam.runners.direct.DirectRunner.apply(
> DirectRunner.java:295)
>
>             at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:
> 385)
>
>             at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:
> 315)
>
>             at org.apache.beam.runners.direct.DirectRunner.apply(
> DirectRunner.java:291)
>
>             at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:
> 385)
>
>             at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:
> 315)
>
>             at org.apache.beam.sdk.values.PCollection.apply(PCollection.
> java:167)
>
>             at org.apache.beam.sdk.io.TextIO$Write$Bound.expand(TextIO.
> java:757)
>
>             at org.apache.beam.sdk.io.TextIO$Write$Bound.expand(TextIO.
> java:527)
>
>             at org.apache.beam.sdk.runners.PipelineRunner.apply(
> PipelineRunner.java:76)
>
>             at org.apache.beam.runners.direct.DirectRunner.apply(
> DirectRunner.java:295)
>
>             at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:
> 385)
>
>             at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:
> 299)
>
>             at org.apache.beam.sdk.values.PCollection.apply(PCollection.
> java:154)
>
>
>
>
>
> Thanks,
>
> Naveen
>
>
>
> *From: *Kenneth Knowles <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Thursday, January 12, 2017 at 4:14 PM
>
> *To: *"[email protected]" <[email protected]>
> *Subject: *Re: KafkaIO Example
>
>
>
> Hi Naveen,
>
>
>
> I have just successfully compiled the code from your most recent email so
> I suspect the error lies elsewhere.
>
>
>
> Taking a wild guess, the error you are getting would be expected if a
> transform were upcast to the rawtype PTransform, as its raw output type is
> POutput.
>
>
>
> Kenn
>
>
>
> On Thu, Jan 12, 2017 at 12:55 PM, Madhire, Naveen <
> [email protected]> wrote:
>
> I changed the code to see where the problem is with incompatible
> parameters.
>
>
>
> I get the below error, is there an issue with *withoutMetadata*() method
> since KV<K, V> doesn’t implements POutput ?
>
>
>
> *java: incompatible types: org.apache.beam.sdk.values.POutput cannot be
> converted to
> org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.values.KV<java.lang.String,java.lang.String>>*
>
>
>
> PCollection <KV<String, String>> kafkaRead = p.apply(KafkaIO.*read*()
>         .withTopics(ImmutableList.*of*(*"Topic"*))
>         .withKeyCoder(StringUtf8Coder.*of*())
>         .withValueCoder(StringUtf8Coder.*of*())
>         .updateConsumerProperties((Map) Maps.*fromProperties*(properties))
>         .withoutMetadata());
>
>
>         kafkaRead
>         .apply(*"Transform "*, MapElements.*via*(*new 
> *SimpleFunction<KV<String,
> String>, String>() {
>             @Override
>             *public *String apply(KV<String, String> input) {
>                 *return *input.getKey() + *"  " *+ input.getValue();
>             }
>         }))
>         .apply(TextIO.Write.*to*(*"Location"*));
>
>
>
>
>
>
>
> Thanks,
>
> Naveen
>
>
>
> *From: *"Madhire, Naveen" <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Thursday, January 12, 2017 at 11:55 AM
>
>
> *To: *"[email protected]" <[email protected]>
> *Subject: *Re: KafkaIO Example
>
>
>
> Yes, I’ve verified the kafka connectivity outside of Beam. I wanted to see
> how it works if I read kafka data using Beam.
>
>
>
> I am changed the code to use the default direct runner and getting a
> compilation error now,
>
>
>
>
>
> java: cannot find symbol
>
>   symbol:   method apply(java.lang.String,org.apache.beam.sdk.transforms.
> MapElements<org.apache.beam.sdk.values.KV<java.lang.
> String,java.lang.String>,java.lang.String>)
>
>   location: interface org.apache.beam.sdk.values.POutput
>
>
>
> Am I missing any other dependency in Pom? Please let me know.
>
>
>
> Code:
>
>
>
> PipelineOptions options = PipelineOptionsFactory.*create*();
>
> Pipeline p = Pipeline.*create*(options);
>
>
> *//Read from Kafka Topic **try *{
>     InputStream props = Resources.*getResource*(*"client.properties"*
> ).openStream();
>     Properties properties = *new *Properties();
>     properties.load(props);
>
>
>
>
>
>     p.apply(KafkaIO.*read*()
>         .withTopics(ImmutableList.*of*(*"Topic"*))
>         .withKeyCoder(StringUtf8Coder.*of*())
>         .withValueCoder(StringUtf8Coder.*of*())
>         .updateConsumerProperties((Map) Maps.*fromProperties*(properties))
>         .withoutMetadata())
>         .apply(*"Transform "*, MapElements.*via*(*new 
> *SimpleFunction<KV<String,
> String>, String>() {
>             @Override
>             *public *String apply(KV<String, String> input) {
>                 *return *input.getKey() + *"  " *+ input.getValue();
>             }
>         }))
>         .apply(TextIO.Write.*to*(*"Location"*));
>
> }
>
>
>
> I’ve the below dependencies included in my pom,
>
>
>
>
>
> *<!-- Apache Beam Dependencies -->*
>
>
>
>
>
>
> *<dependency>     <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-io-kafka</artifactId>
> <version>0.4.0</version> </dependency> *<*dependency*>
>     <*groupId*>org.apache.beam</*groupId*>
>     <*artifactId*>beam-sdks-java-core</*artifactId*>
>     <*version*>0.4.0</*version*>
> </*dependency*>
> <*dependency*>
>     <*groupId*>org.apache.beam</*groupId*>
>     <*artifactId*>beam-runners-direct-java</*artifactId*>
>     <*version*>0.4.0</*version*>
> </*dependency*>
> <*dependency*>
>     <*groupId*>org.apache.beam</*groupId*>
>     <*artifactId*>beam-runners-core-java</*artifactId*>
>     <*version*>0.4.0</*version*>
> </*dependency*>
>
> *<!—Business logic -->*
>
> <*dependency*>
>     <*groupId*>org.drools</*groupId*>
>     <*artifactId*>drools-core</*artifactId*>
>     <*version*>6.2.0.Final</*version*>
> </*dependency*>
>
>
>
> Thanks,
>
> Naveen
>
>
>
> *From: *Lukasz Cwik <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Thursday, January 12, 2017 at 10:58 AM
> *To: *"[email protected]" <[email protected]>
> *Subject: *Re: KafkaIO Example
>
>
>
> I'm assuming you mean DirectRunner and not DataflowDirectRunner.
>
>
>
> On Wed, Jan 11, 2017 at 4:23 PM, Raghu Angadi <[email protected]> wrote:
>
>
>
> On Wed, Jan 11, 2017 at 1:56 PM, Madhire, Naveen <
> [email protected]> wrote:
>
> I can confirm the authorization and authentication to Kafka is behaving
> correctly,
>
>
>
> Did you confirm this within Beam/KafkIO context or outside? When I last
> checked Kafka requires the the credentials file to be available on local
> filesystem where the KafkaConsumer runs. All the workers need to have this
> file. Shipping these files to workers depends on the runner/execution
> environment.
>
>
>
> Can you try with DataflowDirectRunner? That would be simpler to debug.
>
>
>
> Raghu.
>
>
>
>
>
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Reply via email to