Hi Naveen, I have just successfully compiled the code from your most recent email so I suspect the error lies elsewhere.
Taking a wild guess, the error you are getting would be expected if a transform were upcast to the rawtype PTransform, as its raw output type is POutput. Kenn On Thu, Jan 12, 2017 at 12:55 PM, Madhire, Naveen < [email protected]> wrote: > I changed the code to see where the problem is with incompatible > parameters. > > > > I get the below error, is there an issue with *withoutMetadata*() method > since KV<K, V> doesn’t implements POutput ? > > > > *java: incompatible types: org.apache.beam.sdk.values.POutput cannot be > converted to > org.apache.beam.sdk.values.PCollection<org.apache.beam.sdk.values.KV<java.lang.String,java.lang.String>>* > > > > PCollection <KV<String, String>> kafkaRead = p.apply(KafkaIO.*read*() > .withTopics(ImmutableList.*of*(*"Topic"*)) > .withKeyCoder(StringUtf8Coder.*of*()) > .withValueCoder(StringUtf8Coder.*of*()) > .updateConsumerProperties((Map) Maps.*fromProperties*(properties)) > .withoutMetadata()); > > > kafkaRead > .apply(*"Transform "*, MapElements.*via*(*new > *SimpleFunction<KV<String, > String>, String>() { > @Override > *public *String apply(KV<String, String> input) { > *return *input.getKey() + *" " *+ input.getValue(); > } > })) > .apply(TextIO.Write.*to*(*"Location"*)); > > > > > > > > Thanks, > > Naveen > > > > *From: *"Madhire, Naveen" <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Thursday, January 12, 2017 at 11:55 AM > > *To: *"[email protected]" <[email protected]> > *Subject: *Re: KafkaIO Example > > > > Yes, I’ve verified the kafka connectivity outside of Beam. I wanted to see > how it works if I read kafka data using Beam. > > > > I am changed the code to use the default direct runner and getting a > compilation error now, > > > > > > java: cannot find symbol > > symbol: method apply(java.lang.String,org.apache.beam.sdk.transforms. > MapElements<org.apache.beam.sdk.values.KV<java.lang. > String,java.lang.String>,java.lang.String>) > > location: interface org.apache.beam.sdk.values.POutput > > > > Am I missing any other dependency in Pom? Please let me know. > > > > Code: > > > > PipelineOptions options = PipelineOptionsFactory.*create*(); > > Pipeline p = Pipeline.*create*(options); > > > *//Read from Kafka Topic **try *{ > InputStream props = Resources.*getResource*(*"client.properties"* > ).openStream(); > Properties properties = *new *Properties(); > properties.load(props); > > > > > > p.apply(KafkaIO.*read*() > .withTopics(ImmutableList.*of*(*"Topic"*)) > .withKeyCoder(StringUtf8Coder.*of*()) > .withValueCoder(StringUtf8Coder.*of*()) > .updateConsumerProperties((Map) Maps.*fromProperties*(properties)) > .withoutMetadata()) > .apply(*"Transform "*, MapElements.*via*(*new > *SimpleFunction<KV<String, > String>, String>() { > @Override > *public *String apply(KV<String, String> input) { > *return *input.getKey() + *" " *+ input.getValue(); > } > })) > .apply(TextIO.Write.*to*(*"Location"*)); > > } > > > > I’ve the below dependencies included in my pom, > > > > > > *<!-- Apache Beam Dependencies -->* > > > > > > > *<dependency> <groupId>org.apache.beam</groupId> > <artifactId>beam-sdks-java-io-kafka</artifactId> > <version>0.4.0</version> </dependency> *<*dependency*> > <*groupId*>org.apache.beam</*groupId*> > <*artifactId*>beam-sdks-java-core</*artifactId*> > <*version*>0.4.0</*version*> > </*dependency*> > <*dependency*> > <*groupId*>org.apache.beam</*groupId*> > <*artifactId*>beam-runners-direct-java</*artifactId*> > <*version*>0.4.0</*version*> > </*dependency*> > <*dependency*> > <*groupId*>org.apache.beam</*groupId*> > <*artifactId*>beam-runners-core-java</*artifactId*> > <*version*>0.4.0</*version*> > </*dependency*> > > *<!—Business logic -->* > > <*dependency*> > <*groupId*>org.drools</*groupId*> > <*artifactId*>drools-core</*artifactId*> > <*version*>6.2.0.Final</*version*> > </*dependency*> > > > > Thanks, > > Naveen > > > > *From: *Lukasz Cwik <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Thursday, January 12, 2017 at 10:58 AM > *To: *"[email protected]" <[email protected]> > *Subject: *Re: KafkaIO Example > > > > I'm assuming you mean DirectRunner and not DataflowDirectRunner. > > > > On Wed, Jan 11, 2017 at 4:23 PM, Raghu Angadi <[email protected]> wrote: > > > > On Wed, Jan 11, 2017 at 1:56 PM, Madhire, Naveen < > [email protected]> wrote: > > I can confirm the authorization and authentication to Kafka is behaving > correctly, > > > > Did you confirm this within Beam/KafkIO context or outside? When I last > checked Kafka requires the the credentials file to be available on local > filesystem where the KafkaConsumer runs. All the workers need to have this > file. Shipping these files to workers depends on the runner/execution > environment. > > > > Can you try with DataflowDirectRunner? That would be simpler to debug. > > > > Raghu. > > > > > > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >
