Thanks Dan.I actually had tried it before but got compilation errors at setting 
the InProcessPipelineRunner  in the PipelineOptions object..I appreciate it if 
you point me to a working sample code.FYI, This is my implementation:import 
com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
DataflowPipelineOptions Myoptions = 
PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);Myoptions.setRunner(InProcessPipelineRunner.class);
I cannot set runner as InProcessPipelineRunner in the last line:The method 
setRunner(Class<? extends PipelineRunner<?>>) in the type PipelineOptions is 
not applicable for the arguments (Class<InProcessPipelineRunner>). Thanks for 
your help.Amir-

      From: Dan Halperin <[email protected]>
 To: [email protected] 
 Sent: Monday, May 2, 2016 12:23 AM
 Subject: Re: KafkaIO Usage & Sample Code
   
On Mon, May 2, 2016 at 12:22 AM, Dan Halperin <[email protected]> wrote:

Hi Amir,
As Frances suggested, you can use the InProcessPipelineRunner instead of the 
DirectPipelineRunner to execute your pipeline. (They're both in the codebase, 
it's just that the Direct runner is the default. Use the --runner command line 
option.)

Amending: it is relatively unlikely that the issues that we caught in testing 
would affect you. So it should be safe for your use case to do this -- and 
definitely safe to at least try it out! 

Dan
On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari <[email protected]> wrote:

Thanks gentsWhat are our options in the meanwhile?Cheers 

Sent from my iPhone
On May 2, 2016, at 12:00 AM, Dan Halperin <[email protected]> wrote:


On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <[email protected]> wrote:

Oh, thanks Frances.

I mixed DirectPipelineRunner ("old" local runner), and InProcessPipelineRunner 
("new" local runner) ;)

We should remove the DirectPipelineRunner to avoid confusion. WDYT ?


We would like to do this soon, but there are some snags.
As a preparation step, Thomas swapped the default runner from Direct to 
InProcess. (#178)
However, testing unfortunately exposed some issues with the InProcess runner. 
(Actually, I should say "fortunately" because the tests caught it! Yay!) So we 
had to roll it back. (#198)
Once we improve the InProcess runner, we can re-do the default swap. After the 
swap, once the tests keep passing for a few days, we do indeed intend to delete 
the current Direct pipeline runner and replace it with the current InProcess 
runner.
Dan 

Regards
JB

On 05/02/2016 03:12 AM, Frances Perry wrote:

+Thomas, author of the InProcessPipelineRunner

The DirectPipelineRunner doesn't yet support unbounded PCollections. You
can try using the InProcessPipelineRunner, which is the re-write of
local execution that provides support for unbounded PCollections and
better checking against the Beam Model. (We'll be renaming this to the
DirectPipelineRunner in the near future to avoid having both as soon as
the functionality of the InProcessPipelineRunner is complete.)

On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <[email protected]
<mailto:[email protected]>> wrote:

    Hi JB,
    I rebuilt my code with the latest :
    kafka-0.1.0-incubating-20160501.070733-11.jar
    
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
    java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
    
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>


    Tried _without setting withMaxNumRecords()_:
    Throws java.lang.IllegalStateException: no evaluator registered for
    Read(UnboundedKafkaSource)
    at
    
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
    at
    
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
    at
    
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)

    _With setting ithMaxNumRecords(_), I see the thread is running, no
    exceptions like above, waiting for incoming Kafka data, but the
    method obtaining the data from processElement(ProcessContext ctx)
    never executes.
    Therefore, nothing goes into apply(TextIO.Write.to
    <http://TextIO.Write.to>("c:\\temp\\KafkaOut\\Kafkadata.txt")).

    I see Kafka Broker reports my laptop IP address as getting a
    connection to it, OK.
    Everything looks OK at the server side.
    Doesn't look like its my lucky day.
    I appreciate any help/feedback/suggetion.
    Cheers

    ------------------------------------------------------------------------
    *From:* Jean-Baptiste Onofré <[email protected] <mailto:[email protected]>>
    *To:* [email protected]
    <mailto:[email protected]>
    *Sent:* Friday, April 29, 2016 10:36 PM
    *Subject:* Re: KafkaIO Usage & Sample Code

    As I said in my previous e-mail, until recently DirectPipelineRunner
    didn't support Unbounded.

    It's now fixed, so if you take a latest nightly build, or build master,
    it should work.

    As workaround, you can also limit the number of message consumed from
    Kafka (and so work with bounded).

    Regards
    JB

    On 04/29/2016 07:12 PM, amir bahmanyari wrote:
     > Hi colleagues,
     > I am moving this conversation to this users mailing list as per Max’s
     > suggestion.
     > Thanks Max.
     > Hi JB,
     > Hope all is great.
     > Is there a resolution to the exception I sent last night pls?
     > When would the sample code to use KafkaIO be released?
     > I really appreciate your valuable time. Below is the exception
    for your
     > reference.
     > This is how it gets used in my code:
     >
     >
    
p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
     >
     > Have a wonderful weekend.
     > Exception in thread "main" java.lang.IllegalStateException: no
    evaluator
     > registered for Read(UnboundedKafkaSource)
     >        at
     >
    
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
     >        at
     >
    
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
     >        at
     >
    
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
     >        at
     >
    
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
     >        at
     >
    
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
     >        at
     > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
     >        at
     >
    
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
     >        at
     >
    
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
     >        at
     >
    
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
     >        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
     >        at
     >
    benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
     > Kind Regards,
     > Amir


    --
    Jean-Baptiste Onofré
    [email protected] <mailto:[email protected]>
    http://blog.nanthrax.net <http://blog.nanthrax.net/>
    Talend - http://www.talend.com <http://www.talend.com/>






-- 
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com









  

Reply via email to