On Mon, May 2, 2016 at 12:22 AM, Dan Halperin <[email protected]> wrote:

> Hi Amir,
>
> As Frances suggested, you can use the InProcessPipelineRunner instead of
> the DirectPipelineRunner to execute your pipeline. (They're both in the
> codebase, it's just that the Direct runner is the default. Use the --runner
> command line option.)
>

Amending: it is relatively unlikely that the issues that we caught in
testing would affect you. So it should be safe for your use case to do this
-- and definitely safe to at least try it out!


>
> Dan
>
> On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari <[email protected]>
> wrote:
>
>> Thanks gents
>> What are our options in the meanwhile?
>> Cheers
>>
>> Sent from my iPhone
>>
>> On May 2, 2016, at 12:00 AM, Dan Halperin <[email protected]> wrote:
>>
>> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <[email protected]>
>> wrote:
>>
>>> Oh, thanks Frances.
>>>
>>> I mixed DirectPipelineRunner ("old" local runner), and
>>> InProcessPipelineRunner ("new" local runner) ;)
>>>
>>> We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
>>>
>>
>> We would like to do this soon, but there are some snags.
>>
>> As a preparation step, Thomas swapped the default runner from Direct to
>> InProcess. (#178 <https://github.com/apache/incubator-beam/pull/178>)
>>
>> However, testing unfortunately exposed some issues with the InProcess
>> runner. (Actually, I should say "fortunately" because the tests caught it!
>> Yay!) So we had to roll it back. (#198
>> <https://github.com/apache/incubator-beam/pull/198>)
>>
>> Once we improve the InProcess runner, we can re-do the default swap.
>> After the swap, once the tests keep passing for a few days, we do indeed
>> intend to delete the current Direct pipeline runner and replace it with the
>> current InProcess runner.
>>
>> Dan
>>
>>
>>>
>>> Regards
>>> JB
>>>
>>> On 05/02/2016 03:12 AM, Frances Perry wrote:
>>>
>>>> +Thomas, author of the InProcessPipelineRunner
>>>>
>>>> The DirectPipelineRunner doesn't yet support unbounded PCollections. You
>>>> can try using the InProcessPipelineRunner, which is the re-write of
>>>> local execution that provides support for unbounded PCollections and
>>>> better checking against the Beam Model. (We'll be renaming this to the
>>>> DirectPipelineRunner in the near future to avoid having both as soon as
>>>> the functionality of the InProcessPipelineRunner is complete.)
>>>>
>>>> On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <[email protected]
>>>> <mailto:[email protected]>> wrote:
>>>>
>>>>     Hi JB,
>>>>     I rebuilt my code with the latest :
>>>>     kafka-0.1.0-incubating-20160501.070733-11.jar
>>>>     <
>>>> https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar
>>>> >
>>>>     java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>>>>     <
>>>> https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>>>> >
>>>>
>>>>
>>>>     Tried _without setting withMaxNumRecords()_:
>>>>     Throws java.lang.IllegalStateException: no evaluator registered for
>>>>     Read(UnboundedKafkaSource)
>>>>     at
>>>>
>>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>>>     at
>>>>
>>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>>>>     at
>>>>
>>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>>>
>>>>     _With setting ithMaxNumRecords(_), I see the thread is running, no
>>>>     exceptions like above, waiting for incoming Kafka data, but the
>>>>     method obtaining the data from processElement(ProcessContext ctx)
>>>>     never executes.
>>>>     Therefore, nothing goes into apply(TextIO.Write.to
>>>>     <http://TextIO.Write.to>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>>>>
>>>>     I see Kafka Broker reports my laptop IP address as getting a
>>>>     connection to it, OK.
>>>>     Everything looks OK at the server side.
>>>>     Doesn't look like its my lucky day.
>>>>     I appreciate any help/feedback/suggetion.
>>>>     Cheers
>>>>
>>>>
>>>> ------------------------------------------------------------------------
>>>>     *From:* Jean-Baptiste Onofré <[email protected] <mailto:
>>>> [email protected]>>
>>>>     *To:* [email protected]
>>>>     <mailto:[email protected]>
>>>>     *Sent:* Friday, April 29, 2016 10:36 PM
>>>>     *Subject:* Re: KafkaIO Usage & Sample Code
>>>>
>>>>
>>>>     As I said in my previous e-mail, until recently DirectPipelineRunner
>>>>     didn't support Unbounded.
>>>>
>>>>     It's now fixed, so if you take a latest nightly build, or build
>>>> master,
>>>>     it should work.
>>>>
>>>>     As workaround, you can also limit the number of message consumed
>>>> from
>>>>     Kafka (and so work with bounded).
>>>>
>>>>     Regards
>>>>     JB
>>>>
>>>>     On 04/29/2016 07:12 PM, amir bahmanyari wrote:
>>>>      > Hi colleagues,
>>>>      > I am moving this conversation to this users mailing list as per
>>>> Max’s
>>>>      > suggestion.
>>>>      > Thanks Max.
>>>>      > Hi JB,
>>>>      > Hope all is great.
>>>>      > Is there a resolution to the exception I sent last night pls?
>>>>      > When would the sample code to use KafkaIO be released?
>>>>      > I really appreciate your valuable time. Below is the exception
>>>>     for your
>>>>      > reference.
>>>>      > This is how it gets used in my code:
>>>>      >
>>>>      >
>>>>
>>>> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>>>>      >
>>>>      > Have a wonderful weekend.
>>>>      > Exception in thread "main" java.lang.IllegalStateException: no
>>>>     evaluator
>>>>      > registered for Read(UnboundedKafkaSource)
>>>>      >        at
>>>>      >
>>>>
>>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>>>      >        at
>>>>      >
>>>>
>>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>>>>      >        at
>>>>      >
>>>>
>>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>>>      >        at
>>>>      >
>>>>
>>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>>>      >        at
>>>>      >
>>>>
>>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>>>>      >        at
>>>>      >
>>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>>>>      >        at
>>>>      >
>>>>
>>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>>>>      >        at
>>>>      >
>>>>
>>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>>>>      >        at
>>>>      >
>>>>
>>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>>>>      >        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>>>>      >        at
>>>>      >
>>>>
>>>> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>>>>      > Kind Regards,
>>>>      > Amir
>>>>
>>>>
>>>>     --
>>>>     Jean-Baptiste Onofré
>>>>     [email protected] <mailto:[email protected]>
>>>>     http://blog.nanthrax.net <http://blog.nanthrax.net/>
>>>>     Talend - http://www.talend.com <http://www.talend.com/>
>>>>
>>>>
>>>>
>>>>
>>>>
>>> --
>>> Jean-Baptiste Onofré
>>> [email protected]
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
>>
>

Reply via email to