Thanks Raghu.I did set it to 10,  it compiled and p.run() runs with no 
exceptions.
PCollection<String> kafkarecords = 
p.apply(KafkaIO.read().withBootstrapServers("kirkhost:9092").withTopics(topics).withMaxNumRecords(10).withValueCoder(StringUtf8Coder.of()).withoutMetadata()).apply(Values.<String>create());p.run();
     How can I print the kafkarecords and/or send them to a TextIO file now?Am 
looking at ParDo option but i get a compile error complaining on arguments 
mismatch.Seems like this should work fine with google.dataflow sdk libs, but am 
having trouble with Beam libs.Thanks for your help.Cheers+have a great weekend.
      From: Raghu Angadi <[email protected]>
 To: [email protected] 
Cc: amir bahmanyari <[email protected]>
 Sent: Friday, April 29, 2016 1:21 PM
 Subject: Re: KafkaIO Usage & Sample Code
   
Thanks Aljoscha.
Amir, You can try setting record limit or max time limit to convert it to a 
bounded source. KafkaIO added this interface mainly for DirectRunner and tests.
 pipeline.apply(KafkaIO.read()....withMaxNumRecords(10000000))    ...
Raghu.
On Fri, Apr 29, 2016 at 1:00 PM, Aljoscha Krettek <[email protected]> wrote:

Hi,I think the problem is that the DirectPipelineRunner seems to not support 
UnboundedSource (of which the UnboundedKafkaSource is an example). You can try 
running it with the InProcessPipelineRunner or the FlinkPipelineRunner.
Cheers,Aljoscha
On Fri, 29 Apr 2016 at 19:12 amir bahmanyari <[email protected]> wrote:

Hi colleagues,I am moving this conversation to this users mailing list as per 
Max’s suggestion.Thanks Max. Hi JB,Hope all is great.Is there a resolution to 
the exception I sent last night pls?When would the sample code to use KafkaIO 
be released?I really appreciate your valuable time. Below is the exception for 
your reference.This is how it gets used in my code:
p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
Have a wonderful weekend. Exception in thread "main" 
java.lang.IllegalStateException: no evaluator registered for 
Read(UnboundedKafkaSource)       at 
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
       at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) 
      at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) 
      at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) 
      at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
       at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) 
      at 
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
       at 
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
       at 
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)       at 
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)  Kind 
Regards, Amir




  

Reply via email to