Thanks Aljoscha.
Amir,
You can try setting record limit or max time limit to convert it to a
bounded source. KafkaIO added this interface mainly for DirectRunner and
tests.
pipeline.apply(KafkaIO.read()....withMaxNumRecords(10000000))
...
Raghu.
On Fri, Apr 29, 2016 at 1:00 PM, Aljoscha Krettek <[email protected]>
wrote:
> Hi,
> I think the problem is that the DirectPipelineRunner seems to not support
> UnboundedSource (of which the UnboundedKafkaSource is an example). You can
> try running it with the InProcessPipelineRunner or the FlinkPipelineRunner.
>
> Cheers,
> Aljoscha
>
> On Fri, 29 Apr 2016 at 19:12 amir bahmanyari <[email protected]> wrote:
>
>> Hi colleagues,
>> I am moving this conversation to this users mailing list as per Max’s
>> suggestion.
>> Thanks Max.
>>
>> Hi JB,
>> Hope all is great.
>> Is there a resolution to the exception I sent last night pls?
>> When would the sample code to use KafkaIO be released?
>> I really appreciate your valuable time. Below is the exception for your
>> reference.
>> This is how it gets used in my code:
>>
>>
>> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>>
>> Have a wonderful weekend.
>>
>> Exception in thread "main" java.lang.IllegalStateException: no evaluator
>> registered for Read(UnboundedKafkaSource)
>> at
>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>> at
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>> at
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>> at
>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>> at
>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>> at
>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>> at
>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>> at
>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>> at
>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>> at
>> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>>
>>
>> Kind Regards,
>>
>> Amir
>>
>