Hi Raghu,I noticed you replied to this thread yesterday regarding users getting
affected by this Kafka version difference etc.And Google DataFlow libs working
fine for Unbound KafkaIO etc.I dont see that email in my inbox anymore. I might
have accidentally deleted it.Could you resend it pls? I appreciate it...Have a
great day.
From: amir bahmanyari <[email protected]>
To: "[email protected]" <[email protected]>
Sent: Wednesday, May 4, 2016 12:57 PM
Subject: Re: KafkaIO Usage & Sample Code
The root cause ended up to be the Kafka version in my lab.Kafka server must be
version 9.0+ for the KafkaIO call in the Beam app code to populate the
PCollection object .Thanks Thomas so much for diagnosing that.Appreciate all
his valuable time he spent with me offline.
Status: Bounded:Works when setting either InProcessPipelineRunner or
DirectPipelineRunner
Unbounded:Throws different exceptions for the different above runners. Thomas
has the different stacktraces.
At the moment, I have withMaxNumRecords(100) set in my KafkaIO call.This causes
the call to block till all the 100 records are received, and then makes it
available to the app to consume it.I tried running the p.run() in a while(true)
loop & setting withMaxNumRecords(100) so I get chunks of 100 records in
multiple files created by TextIO.Didnt cut it...Exceptions complaining
"KafkaConsumer is not safe for multi-threaded access".
I know Unbounded is the ultimate way to achieve a true real-time
streaming.Given that Unbounded is not available at the moment, is there a work
around that makes every single record available immediately to the app?
Thanks everyone again for your valuable help.Amir- From: Thomas Groh
<[email protected]>
To: [email protected]
Sent: Monday, May 2, 2016 11:41 AM
Subject: Re: KafkaIO Usage & Sample Code
Yeah, JB has it - if that commit
(https://github.com/apache/incubator-beam/commit/b2b77e380) is in your history,
the call should compile correctly; if it's not, then the
InProcessPipelineRunner doesn't implement the appropriate interface, and that
call won't typecheck (and lead to your compilation failure) - syncing to a more
recent version should fix the problem.
On Mon, May 2, 2016 at 11:26 AM, Jean-Baptiste Onofré <[email protected]> wrote:
Thomas meant that you have to checkout after or at this commit:
git checkout b2b77e380
;)
Regards
JB
On 05/02/2016 07:52 PM, amir bahmanyari wrote:
Hi Dan,
Sorry! I honestly dont know what "so long as you're on a commit after
b2b77e380" means :)))
If this means there is a specific jar file I need to have on my path,
could you point me to a link where I can get the right jar file pls?
I appreciate i sir.
Amir
------------------------------------------------------------------------
*From:* Thomas Groh <[email protected]>
*To:* [email protected]
*Sent:* Monday, May 2, 2016 10:02 AM
*Subject:* Re: KafkaIO Usage & Sample Code
Your calls should work - so long as you're on a commit after b2b77e380
(when we started implementing PipelineRunner), the
InProcessPipelineRunner should be a valid argument to
PipelineOptions#setRunner
As an example, there's the InProcessPipelineRunnerTest
(https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73)
On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <[email protected]
<mailto:[email protected]>> wrote:
Hi Amir,
The problem is likely in using DataflowPipelineOptions.class -- this
is specific to the Cloud Dataflow service and the
DataflowPipelineRunner. Try using just "PipelineOptions".
Dan
On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <[email protected]
<mailto:[email protected]>> wrote:
Thanks Dan.
I actually had tried it before but got compilation errors at
setting the InProcessPipelineRunner in the PipelineOptions object..
I appreciate it if you point me to a working sample code.
FYI, This is my implementation:
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
DataflowPipelineOptions Myoptions =
PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
Myoptions.setRunner(InProcessPipelineRunner.class);
I cannot set runner as InProcessPipelineRunner in the last line:
The method setRunner(Class<? extends PipelineRunner<?>>) in the
type PipelineOptions is not applicable for the arguments
(Class<InProcessPipelineRunner>).
Thanks for your help.
Amir-
------------------------------------------------------------------------
*From:* Dan Halperin <[email protected]
<mailto:[email protected]>>
*To:* [email protected]
<mailto:[email protected]>
*Sent:* Monday, May 2, 2016 12:23 AM
*Subject:* Re: KafkaIO Usage & Sample Code
On Mon, May 2, 2016 at 12:22 AM, Dan Halperin
<[email protected] <mailto:[email protected]>> wrote:
Hi Amir,
As Frances suggested, you can use the
InProcessPipelineRunner instead of the DirectPipelineRunner
to execute your pipeline. (They're both in the codebase,
it's just that the Direct runner is the default. Use the
--runner command line option.)
Amending: it is relatively unlikely that the issues that we
caught in testing would affect you. So it should be safe for
your use case to do this -- and definitely safe to at least try
it out!
Dan
On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari
<[email protected] <mailto:[email protected]>> wrote:
Thanks gents
What are our options in the meanwhile?
Cheers
Sent from my iPhone
On May 2, 2016, at 12:00 AM, Dan Halperin
<[email protected] <mailto:[email protected]>> wrote:
On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré
<[email protected] <mailto:[email protected]>> wrote:
Oh, thanks Frances.
I mixed DirectPipelineRunner ("old" local runner),
and InProcessPipelineRunner ("new" local runner) ;)
We should remove the DirectPipelineRunner to avoid
confusion. WDYT ?
We would like to do this soon, but there are some snags.
As a preparation step, Thomas swapped the default
runner from Direct to InProcess. (#178
<https://github.com/apache/incubator-beam/pull/178>)
However, testing unfortunately exposed some issues
with the InProcess runner. (Actually, I should say
"fortunately" because the tests caught it! Yay!) So we
had to roll it back. (#198
<https://github.com/apache/incubator-beam/pull/198>)
Once we improve the InProcess runner, we can re-do the
default swap. After the swap, once the tests keep
passing for a few days, we do indeed intend to delete
the current Direct pipeline runner and replace it with
the current InProcess runner.
Dan
Regards
JB
On 05/02/2016 03:12 AM, Frances Perry wrote:
+Thomas, author of the InProcessPipelineRunner
The DirectPipelineRunner doesn't yet support
unbounded PCollections. You
can try using the InProcessPipelineRunner,
which is the re-write of
local execution that provides support for
unbounded PCollections and
better checking against the Beam Model. (We'll
be renaming this to the
DirectPipelineRunner in the near future to
avoid having both as soon as
the functionality of the
InProcessPipelineRunner is complete.)
On Sun, May 1, 2016 at 4:38 PM, amir
bahmanyari <[email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>> wrote:
Hi JB,
I rebuilt my code with the latest :
kafka-0.1.0-incubating-20160501.070733-11.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar>
java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
<https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar>
Tried _without setting withMaxNumRecords()_:
Throws java.lang.IllegalStateException: no
evaluator registered for
Read(UnboundedKafkaSource)
at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
_With setting ithMaxNumRecords(_), I see
the thread is running, no
exceptions like above, waiting for
incoming Kafka data, but the
method obtaining the data from
processElement(ProcessContext ctx)
never executes.
Therefore, nothing goes into
apply(TextIO.Write.to <http://textio.write.to/>
<http://TextIO.Write.to
<http://textio.write.to/>>("c:\\temp\\KafkaOut\\Kafkadata.txt")).
I see Kafka Broker reports my laptop IP
address as getting a
connection to it, OK.
Everything looks OK at the server side.
Doesn't look like its my lucky day.
I appreciate any help/feedback/suggetion.
Cheers
------------------------------------------------------------------------
*From:* Jean-Baptiste Onofré
<[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
*To:* [email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>
*Sent:* Friday, April 29, 2016 10:36 PM
*Subject:* Re: KafkaIO Usage & Sample Code
As I said in my previous e-mail, until
recently DirectPipelineRunner
didn't support Unbounded.
It's now fixed, so if you take a latest
nightly build, or build master,
it should work.
As workaround, you can also limit the
number of message consumed from
Kafka (and so work with bounded).
Regards
JB
On 04/29/2016 07:12 PM, amir bahmanyari wrote:
> Hi colleagues,
> I am moving this conversation to this
users mailing list as per Max’s
> suggestion.
> Thanks Max.
> Hi JB,
> Hope all is great.
> Is there a resolution to the exception
I sent last night pls?
> When would the sample code to use
KafkaIO be released?
> I really appreciate your valuable time.
Below is the exception
for your
> reference.
> This is how it gets used in my code:
>
>
p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>
> Have a wonderful weekend.
> Exception in thread "main"
java.lang.IllegalStateException: no
evaluator
> registered for Read(UnboundedKafkaSource)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
>
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> at
>
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> at
>
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> at
>
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> Kind Regards,
> Amir
--
Jean-Baptiste Onofré
[email protected]
<mailto:[email protected]>
<mailto:[email protected]
<mailto:[email protected]>>
http://blog.nanthrax.net
<http://blog.nanthrax.net/>
<http://blog.nanthrax.net/>
Talend - http://www.talend.com
<http://www.talend.com/> <http://www.talend.com/>
--
Jean-Baptiste Onofré
[email protected] <mailto:[email protected]>
http://blog.nanthrax.net <http://blog.nanthrax.net/>
Talend - http://www.talend.com
<http://www.talend.com/>
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com