google.dataflow and beam have pretty much the same API.
Try posting the code that fails to compile, will be simpler to suggest a
fix. Something like following should work.
.apply(...)
.apply(ParDo.of(new DoFn<String, Void>() {
@Override
public void processElement(ProcessContext ctx) throws Exception {
System.out.printf("read '%s' from kafka", ctx.element());
}
})));
On Fri, Apr 29, 2016 at 3:39 PM, amir bahmanyari <[email protected]>
wrote:
> Thanks Raghu.
> I did set it to 10, it compiled and p.run() runs with no exceptions.
>
> PCollection<String> kafkarecords =
> p.apply(KafkaIO.read().withBootstrapServers("kirkhost:9092").withTopics(topics).withMaxNumRecords(10).withValueCoder(StringUtf8Coder.of()).withoutMetadata()).apply(Values.<String>create());
> p.run();
>
> How can I print the kafkarecords and/or send them to a TextIO file now?
> Am looking at ParDo option but i get a compile error complaining on
> arguments mismatch.
> Seems like this should work fine with google.dataflow sdk libs, but am
> having trouble with Beam libs.
> Thanks for your help.
> Cheers+have a great weekend.
>
> ------------------------------
> *From:* Raghu Angadi <[email protected]>
> *To:* [email protected]
> *Cc:* amir bahmanyari <[email protected]>
> *Sent:* Friday, April 29, 2016 1:21 PM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> Thanks Aljoscha.
>
> Amir,
> You can try setting record limit or max time limit to convert it to a
> bounded source. KafkaIO added this interface mainly for DirectRunner and
> tests.
>
> pipeline.apply(KafkaIO.read()....withMaxNumRecords(10000000))
> ...
>
> Raghu.
>
> On Fri, Apr 29, 2016 at 1:00 PM, Aljoscha Krettek <[email protected]>
> wrote:
>
> Hi,
> I think the problem is that the DirectPipelineRunner seems to not support
> UnboundedSource (of which the UnboundedKafkaSource is an example). You can
> try running it with the InProcessPipelineRunner or the FlinkPipelineRunner.
>
> Cheers,
> Aljoscha
>
> On Fri, 29 Apr 2016 at 19:12 amir bahmanyari <[email protected]> wrote:
>
> Hi colleagues,
> I am moving this conversation to this users mailing list as per Max’s
> suggestion.
> Thanks Max.
>
> Hi JB,
> Hope all is great.
> Is there a resolution to the exception I sent last night pls?
> When would the sample code to use KafkaIO be released?
> I really appreciate your valuable time. Below is the exception for your
> reference.
> This is how it gets used in my code:
>
>
> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>
> Have a wonderful weekend.
>
> Exception in thread "main" java.lang.IllegalStateException: no evaluator
> registered for Read(UnboundedKafkaSource)
> at
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> at
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> at
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> at
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> at
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>
>
> Kind Regards,
>
> Amir
>
>
>
>
>