Re: guava collections and kryo under spark runner

2017-03-24 Thread Aviem Zur
Hi Antony.

Spark uses serializers to serialize data, however this clashes with Beam's
concept of coders, so we should be using coders instead of Spark's
serializer (Specifically, in our configuration, Kryo is used as Spark's
serializer).

>From your stack trace it seems that Kryo is being used to serialize your
class my.pkg.types.MyType . This shouldn't happen.
My guess is we are accidentally using Spark's serializer (Kryo) somewhere
instead of coders.

If you share your pipeline (feel free to redact anything pertaining to your
organization) it will help us locate where this issue is happening.


On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré 
wrote:

> OK, discussing with Aviem, the problem is that Kryo is not able to
> serialize
> Guava collections (it's a known issue).
>
> The question is why Kryo wants to serialize the collections (it could be
> related
> to a change in the Windowing code).
>
> Aviem and I are taking a look on that.
>
> Regards
> JB
>
> On 03/24/2017 09:10 AM, Antony Mayi wrote:
> > I am on 0.6.0
> >
> > thx,
> > a.
> >
> >
> > On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré 
> wrote:
> >
> >
> > Hi Antony,
> >
> > which Beam version are you using ? We did some improvement about guava
> shading
> > recently, wanted to check if it's related.
> >
> > Regards
> > JB
> >
> > On 03/24/2017 08:03 AM, Antony Mayi wrote:
> >> Hi,
> >>
> >> I am using guava's collections (immutables from 21.0) in my beam
> pipelines and
> >> when running on spark runner it fails due to kryo unable to serialize
> those. I
> >> can see there have been some approaches addressing this using
> >> de.javakaffee.kryo-serializers
> >> ->
> >
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> > <
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> >but
> >> that's been removed recently.
> >>
> >> how should I solve this?
> >>
> >> the stacktrace is bellow.
> >>
> >> thanks,
> >> antony.
> >>
> >>
> >> [WARNING]
> >> java.lang.RuntimeException: org.apache.spark.SparkException: Job
> aborted due to
> >> stage failure: Exception while getting task result:
> >> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> >> Serialization trace:
> >> fields (my.pkg.types.MyType)
> >> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> >> at my.pkg.Main.main(Main.java:33)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
> >> at java.lang.Thread.run(Thread.java:745)
> >
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org 
> > http://blog.nanthrax.net 
> > Talend - http://www.talend.com 
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: guava collections and kryo under spark runner

2017-03-24 Thread Aviem Zur
Hi Antony,

Thanks for sharing your code!

I created a test that uses the exact pipeline. I couldn't find the `Row`
class referred to in your pipeline so I created it as a POJO and registered
its coder as `AvroCoder`.

Unfortunately this test passes so it does not reproduce the issue you are
experiencing.
Please find the test in the following gist
https://gist.github.com/aviemzur/4ef08e440f989b29cb6f890ddf1f7e12

Can you try to tweak it to be more like your use case in which you hit the
exception?

On Fri, Mar 24, 2017 at 3:09 PM Antony Mayi <antonym...@yahoo.com> wrote:

> sorry, wrong version of the file. now corrected:
> a.
>
>
> On Friday, 24 March 2017, 13:06, Antony Mayi <antonym...@yahoo.com> wrote:
>
>
> Hi Aviem,
>
> it took me a while to narrow it down to a simple reproducible case but
> here it is. The problem appears to be related to Combine.globally().
> Attached is my demo code showing the error.
>
> Thanks,
> a.
>
>
> On Friday, 24 March 2017, 10:19, Aviem Zur <aviem...@gmail.com> wrote:
>
>
> Hi Antony.
>
> Spark uses serializers to serialize data, however this clashes with Beam's
> concept of coders, so we should be using coders instead of Spark's
> serializer (Specifically, in our configuration, Kryo is used as Spark's
> serializer).
>
> From your stack trace it seems that Kryo is being used to serialize your
> class my.pkg.types.MyType . This shouldn't happen.
> My guess is we are accidentally using Spark's serializer (Kryo) somewhere
> instead of coders.
>
> If you share your pipeline (feel free to redact anything pertaining to
> your organization) it will help us locate where this issue is happening.
>
>
> On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
> OK, discussing with Aviem, the problem is that Kryo is not able to
> serialize
> Guava collections (it's a known issue).
>
> The question is why Kryo wants to serialize the collections (it could be
> related
> to a change in the Windowing code).
>
> Aviem and I are taking a look on that.
>
> Regards
> JB
>
> On 03/24/2017 09:10 AM, Antony Mayi wrote:
> > I am on 0.6.0
> >
> > thx,
> > a.
> >
> >
> > On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> >
> >
> > Hi Antony,
> >
> > which Beam version are you using ? We did some improvement about guava
> shading
> > recently, wanted to check if it's related.
> >
> > Regards
> > JB
> >
> > On 03/24/2017 08:03 AM, Antony Mayi wrote:
> >> Hi,
> >>
> >> I am using guava's collections (immutables from 21.0) in my beam
> pipelines and
> >> when running on spark runner it fails due to kryo unable to serialize
> those. I
> >> can see there have been some approaches addressing this using
> >> de.javakaffee.kryo-serializers
> >> ->
> >
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> > <
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> >but
> >> that's been removed recently.
> >>
> >> how should I solve this?
> >>
> >> the stacktrace is bellow.
> >>
> >> thanks,
> >> antony.
> >>
> >>
> >> [WARNING]
> >> java.lang.RuntimeException: org.apache.spark.SparkException: Job
> aborted due to
> >> stage failure: Exception while getting task result:
> >> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> >> Serialization trace:
> >> fields (my.pkg.types.MyType)
> >> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
> >> at my.pkg.Main.main(Main.java:33)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
> >> at java.lang.Thread.run(Thread.java:745)
> >
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org <mailto:jbono...@apache.org>
> > http://blog.nanthrax.net <http://blog.nanthrax.net/>
> > Talend - http://www.talend.com <http://www.talend.com/>
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>
>
>


Re: guava collections and kryo under spark runner

2017-03-24 Thread Aviem Zur
Oh yes I see your second version now, that indeed reproduces the issue,
thanks!
I'll update the gist to include this change.

On Fri, Mar 24, 2017 at 3:42 PM Antony Mayi <antonym...@yahoo.com> wrote:

> Hi Aviem,
>
> Apologies for the confusion - did you see my second version of the file I
> sent shortly after the first one? That second one had the Row class
> included (using just "implements Serializable").
>
> Thanks,
> a.
>
>
> On Friday, 24 March 2017, 13:36, Aviem Zur <aviem...@gmail.com> wrote:
>
>
> Hi Antony,
>
> Thanks for sharing your code!
>
> I created a test that uses the exact pipeline. I couldn't find the `Row`
> class referred to in your pipeline so I created it as a POJO and registered
> its coder as `AvroCoder`.
>
> Unfortunately this test passes so it does not reproduce the issue you are
> experiencing.
> Please find the test in the following gist
> https://gist.github.com/aviemzur/4ef08e440f989b29cb6f890ddf1f7e12
>
> Can you try to tweak it to be more like your use case in which you hit the
> exception?
>
> On Fri, Mar 24, 2017 at 3:09 PM Antony Mayi <antonym...@yahoo.com> wrote:
>
> sorry, wrong version of the file. now corrected:
> a.
>
>
> On Friday, 24 March 2017, 13:06, Antony Mayi <antonym...@yahoo.com> wrote:
>
>
> Hi Aviem,
>
> it took me a while to narrow it down to a simple reproducible case but
> here it is. The problem appears to be related to Combine.globally().
> Attached is my demo code showing the error.
>
> Thanks,
> a.
>
>
> On Friday, 24 March 2017, 10:19, Aviem Zur <aviem...@gmail.com> wrote:
>
>
> Hi Antony.
>
> Spark uses serializers to serialize data, however this clashes with Beam's
> concept of coders, so we should be using coders instead of Spark's
> serializer (Specifically, in our configuration, Kryo is used as Spark's
> serializer).
>
> From your stack trace it seems that Kryo is being used to serialize your
> class my.pkg.types.MyType . This shouldn't happen.
> My guess is we are accidentally using Spark's serializer (Kryo) somewhere
> instead of coders.
>
> If you share your pipeline (feel free to redact anything pertaining to
> your organization) it will help us locate where this issue is happening.
>
>
> On Fri, Mar 24, 2017 at 11:14 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
> OK, discussing with Aviem, the problem is that Kryo is not able to
> serialize
> Guava collections (it's a known issue).
>
> The question is why Kryo wants to serialize the collections (it could be
> related
> to a change in the Windowing code).
>
> Aviem and I are taking a look on that.
>
> Regards
> JB
>
> On 03/24/2017 09:10 AM, Antony Mayi wrote:
> > I am on 0.6.0
> >
> > thx,
> > a.
> >
> >
> > On Friday, 24 March 2017, 8:58, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> >
> >
> > Hi Antony,
> >
> > which Beam version are you using ? We did some improvement about guava
> shading
> > recently, wanted to check if it's related.
> >
> > Regards
> > JB
> >
> > On 03/24/2017 08:03 AM, Antony Mayi wrote:
> >> Hi,
> >>
> >> I am using guava's collections (immutables from 21.0) in my beam
> pipelines and
> >> when running on spark runner it fails due to kryo unable to serialize
> those. I
> >> can see there have been some approaches addressing this using
> >> de.javakaffee.kryo-serializers
> >> ->
> >
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> > <
> https://github.com/apache/beam/commits/master/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
> >but
> >> that's been removed recently.
> >>
> >> how should I solve this?
> >>
> >> the stacktrace is bellow.
> >>
> >> thanks,
> >> antony.
> >>
> >>
> >> [WARNING]
> >> java.lang.RuntimeException: org.apache.spark.SparkException: Job
> aborted due to
> >> stage failure: Exception while getting task result:
> >> com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> >> Serialization trace:
> >> fields (my.pkg.types.MyType)
> >> value (org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow)
> >> at
> >>
> >
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> >> at
> >>
> >
>

Re: exceptions in PTransform

2017-03-09 Thread Aviem Zur
Hi Antony.

You are correct, PTransform#expand cannot throw checked exceptions. What
you need to do is wrap your checked exception in a runtime exception.
For example:
https://github.com/apache/beam/blob/adba4c660ef54b98055d30ee1ad7cbf440420030/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java#L148-L148


For more information see the PTransform style guide:
https://beam.apache.org/contribute/ptransform-style-guide/#runtime-errors-and-data-consistency

Aviem.

On Thu, Mar 9, 2017 at 10:46 AM Antony Mayi  wrote:

> Hi,
>
> when implementing PTransform.expand what can I do with checked exceptions
> that might be thrown? It is not declared to throw anything checked so how
> can I fail it?
>
> Thanks Antony.
>


Re: Spark job hangs up at Evaluating ParMultiDo(ParseInput)

2017-08-10 Thread Aviem Zur
Hi Jayaraman,

Thanks for reaching out.
We run Beam using Spark runner daily on a yarn cluster.

It appears that in many of the logs you sent there is hanging when
connecting to certain servers on certain ports, could this be a network
issue or an issue with your Spark setup?

Could you please share which version of Beam you are running?

On Thu, Aug 3, 2017 at 12:18 PM Sathish Jayaraman 
wrote:

> Hi,
>
> Thanks for trying it out.
>
> I was running the job in local single node setup. I also spawn a
> HDInsights cluster in Azure platform just to test the WordCount program.
> Its the same result there too, stuck at the Evaluating ParMultiDo step. It
> runs fine in mvn compile exec, but when bundled into jar & submitted via
> spark-submit there is no result. If there is no support from Beam to run on
> top of Spark then I have to write Spark native code which is what I am
> doing currently.
>
> Regards,
> Sathish. J
>
>
> On 03-Aug-2017, at 2:34 PM, Jean-Baptiste Onofré  wrote:
>
> nanthrax.net 
>
>
>


Re: A problem about additional outputs

2017-04-27 Thread Aviem Zur
Yes. Spark streaming support is still experimental and this issue exists in
Beam 0.6.0

This has since been fixed and the fix will be a part of the upcoming
release.

Since this isn't the first time a user has encountered this I've created a
JIRA ticket for better visibility for this issue:
https://issues.apache.org/jira/browse/BEAM-2106

Thanks for reaching out! Please feel fry to try out your pipeline using
Beam master branch or one of the nightly SNAPSHOT builds.

On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498...@qq.com> wrote:

> Here is my maven configuration, thank you.
>
> 
>   org.apache.beam
>   beam-sdks-java-core
>   0.6.0
> 
> 
>   org.apache.beam
>   beam-runners-direct-java
>   0.6.0
>   runtime
> 
> 
> org.apache.beam
> beam-sdks-java-io-kafka
> 0.6.0
> 
> 
> org.apache.beam
> beam-runners-spark
> 0.6.0
> 
>
>
> On 26 Apr 2017, at 6:58 PM, Aviem Zur <aviem...@gmail.com> wrote:
>
> Hi,
>
> Can you please share which version of Beam you are using?
>
> On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498...@qq.com> wrote:
>
>> hi, here is my program that about additional outputs for Apache Beam  and
>>  the result :
>> public class DataExtraction2 {
>> public static void main(String[] args) {
>> System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1");
>> SparkPipelineOptions options =
>> PipelineOptionsFactory.as(SparkPipelineOptions.class);
>> options.setSparkMaster("local[4]");
>> //options.setCheckpointDir("./checkpoint");
>> options.setRunner(SparkRunner.class);
>> //options.setRunner(DirectRunner.class);
>> options.setStorageLevel("MEMORY_ONLY");
>> options.setAppName("testMavenDependency");
>> options.setBatchIntervalMillis(1000L);
>> options.setEnableSparkMetricSinks(false);
>> Pipeline p = Pipeline.create(options);
>> List topics = Arrays.asList("beamOnSparkTest".split(","));
>>
>> final TupleTag rawDataTag = new TupleTag() {
>> };
>>
>> final TupleTag exceptionTag = new TupleTag() {
>> };
>> final TupleTag riskEventLogTag = new TupleTag() {
>> };
>> final TupleTag statisticsTag = new TupleTag() {
>> };
>> final TupleTag errorTargetLogTag = new TupleTag()
>> {
>> };
>> final TupleTag equipmentLogTag = new TupleTag() {
>> };
>> final TupleTag performanceLogTag = new TupleTag()
>> {
>> };
>> PCollection rawData = p.apply(KafkaIO.<Void, String>read()
>> .withBootstrapServers("172.17.1.138:9092,
>> 172.17.1.137:9092")
>> .withTopics(topics)
>> .withConsumerFactoryFn(new CafintechConsumerFactoryFn())
>> .withKeyCoder(VoidCoder.of())
>> .withValueCoder(StringUtf8Coder.of())
>> .withoutMetadata()
>> ).apply(Values.create());
>> PCollectionTuple results = rawData.apply(
>> ParDo.withOutputTags(rawDataTag,
>> TupleTagList.of(exceptionTag)
>> .and(riskEventLogTag)
>> .and(statisticsTag)
>> .and(errorTargetLogTag)
>> .and(equipmentLogTag)
>> .and(performanceLogTag))
>> .of(new DoFn<String, String>() {
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>> String idCoop = "";
>> int eventType = 0;
>> int osPlatformType = -1;
>> String innerDecision = "";
>> String outterDecision = "";
>> // Date appTime = new Date();
>> String eventId = "";
>> //String strategyList = "";
>> String uuid = "";
>> String phoneNo = "";
>> int equipmentType = -1;
>> int antiFraudTime = -1;
>>   

Re: Apache Beam Slack channel

2017-04-30 Thread Aviem Zur
Invitation sent.

On Sun, Apr 30, 2017 at 4:41 PM Lars BK <larsbkrog...@gmail.com> wrote:

> Hi,
>
> I would like to request an invite to the Slack team too.
>
> Regards,
> Lars
>
>
> On 2017-04-28 15:40 (+0200), Ismaël Mejía <ieme...@gmail.com> wrote:
> > Done.
> >
> > On Fri, Apr 28, 2017 at 3:32 PM, Andrew Psaltis <
> psaltis.and...@gmail.com>
> > wrote:
> >
> > > Please add me as well. Thanks,
> > >
> > > On Fri, Apr 28, 2017 at 7:59 AM, Anuj Kumar <anujs...@gmail.com>
> wrote:
> > >
> > >> Thanks
> > >>
> > >> On Fri, Apr 28, 2017 at 3:56 PM, Aviem Zur <aviem...@gmail.com>
> wrote:
> > >>
> > >>> Invitation sent.
> > >>>
> > >>> On Fri, Apr 28, 2017 at 1:24 PM Anuj Kumar <anujs...@gmail.com>
> wrote:
> > >>>
> > >>>> Please add me. Thanks.
> > >>>>
> > >>>> On Fri, Apr 28, 2017 at 9:20 AM, Tom Pollard <
> > >>>> tpoll...@flashpoint-intel.com> wrote:
> > >>>>
> > >>>>> Done
> > >>>>>
> > >>>>>
> > >>>>> On Apr 27, 2017, at 11:48 PM, Sai Boorlagadda <
> > >>>>> sai.boorlaga...@gmail.com> wrote:
> > >>>>>
> > >>>>> Please include me as well.
> > >>>>>
> > >>>>> Sai
> > >>>>>
> > >>>>> On Thu, Apr 27, 2017 at 5:59 PM, Davor Bonaci <da...@apache.org>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> (There were already done by someone.)
> > >>>>>>
> > >>>>>> On Thu, Apr 27, 2017 at 1:53 PM, Tony Moulton <
> > >>>>>> tmoul...@flashpoint-intel.com> wrote:
> > >>>>>>
> > >>>>>>> Please include me as well during the next batch of Slack
> additions.
> > >>>>>>> Thanks!
> > >>>>>>>
> > >>>>>>> —
> > >>>>>>> Tony
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Apr 27, 2017, at 4:51 PM, <oscar.b.rodrig...@accenture.com> <
> > >>>>>>> oscar.b.rodrig...@accenture.com> wrote:
> > >>>>>>>
> > >>>>>>> Hi there,
> > >>>>>>>
> > >>>>>>> Can you please add me to the Apache Beam Slack channel?
> > >>>>>>>
> > >>>>>>> Thanks
> > >>>>>>> -Oscar
> > >>>>>>>
> > >>>>>>> Oscar Rodriguez
> > >>>>>>> Solution Architect
> > >>>>>>> Google CoE | Accenture Cloud
> > >>>>>>> M +1 718-440-0881 <(718)%20440-0881> <(718)%20440-0881> | W +1
> 917-452-3923 <(917)%20452-3923>
> > >>>>>>> <(917)%20452-3923>
> > >>>>>>> email: oscar.b.rodrig...@accenture.com
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> --
> > >>>>>>>
> > >>>>>>> This message is for the designated recipient only and may contain
> > >>>>>>> privileged, proprietary, or otherwise confidential information.
> If you have
> > >>>>>>> received it in error, please notify the sender immediately and
> delete the
> > >>>>>>> original. Any other use of the e-mail by you is prohibited.
> Where allowed
> > >>>>>>> by local law, electronic communications with Accenture and its
> affiliates,
> > >>>>>>> including e-mail and instant messaging (including content), may
> be scanned
> > >>>>>>> by our systems for the purposes of information security and
> assessment of
> > >>>>>>> internal compliance with Accenture policy.
> > >>>>>>> 
> > >>>>>>> __
> > >>>>>>>
> > >>>>>>> www.accenture.com
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>
> > >
> > >
> > > --
> > > Thanks,
> > > Andrew
> > >
> > > Subscribe to my book: Streaming Data <http://manning.com/psaltis>
> > > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
> > > twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
> > >
> >
>


Re: A problem about additional outputs

2017-05-03 Thread Aviem Zur
By "cannot run normally" do you mean you get an exception? We recently had
a bug on master in which streaming pipelines containing `ParDo` with
multiple outputs ran into `NullPointerException`. This was fixed here:
https://issues.apache.org/jira/browse/BEAM-2029
Is this what you're facing? If so does pulling master and rebuilding help?

On Thu, May 4, 2017 at 5:37 AM zhenglin.Tian <zhenglin.t...@cafintech.com>
wrote:

> hi, i have a trouble about addition outputs with SparkRunner.
> Here if my code, when i use DirectRunner, everything runs OK, but if i
> replace DirectRunner with SparkRunner, the code can't run normally.
>
> public class UnifiedDataExtraction {
>
> private static TupleTag rawDataTag = new TupleTag() {
> };
>
> private static TupleTag exceptionTag = new TupleTag() {
> };
>
> public static void main(String[] args) {
> System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME);
>
> SparkPipelineOptions options =
> PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
> options.setSparkMaster(ConstantsOwn.SPARK_MASTER);
> options.setRunner(SparkRunner.class);
> //options.setRunner(DirectRunner.class);
> options.setStorageLevel("MEMORY_ONLY");
> options.setAppName(ConstantsOwn.SPARK_APPNAME);
> options.setBatchIntervalMillis(1000L);
> options.setEnableSparkMetricSinks(false);
> Pipeline p = Pipeline.create(options);
>
>
> List topics =
> Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(","));
>
> PCollection rawData = p.apply(KafkaIO.<Void, String>read()
> .withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS)
> .withTopics(topics)
> //.withConsumerFactoryFn(new CafintechConsumerFactoryFn())
> .withKeyCoder(VoidCoder.of())
> .withValueCoder(StringUtf8Coder.of())
> .withKeyDeserializer(VoidDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .withoutMetadata()
> ).apply(Values.create());
>
> rawData.apply(ParDo.of(SimpleViewDoFn.of(true))); //simply print
> each elment of rawData. Able to run normally   ①
> PCollectionTuple results = rawData.apply("logAnatomyTest",
>//   ②
> ParDo.of(
> new DoFn<String, String>() {
> @ProcessElement
> public void process(ProcessContext c) {
> String element = c.element();
> System.out.println(""+element);
> if (!element.equals("EOF")) {
> c.output(c.element());
> }
> }
> }
> ).withOutputTags(rawDataTag, TupleTagList.of(exceptionTag))
> );
> p.run().waitUntilFinish();
>}
> }
>
> in the privious code, the code that be commented with ① can be able to run
> normally,but ②,i can't get anything.
>
> here is my beam version
> 
> org.apache.beam
> beam-sdks-java-core
> 0.7.0-SNAPSHOT
> 
> 
> org.apache.beam
> beam-runners-direct-java
> 0.7.0-SNAPSHOT
> runtime
> 
> 
>     org.apache.beam
> beam-sdks-java-io-kafka
> 0.7.0-SNAPSHOT
> 
> 
>  org.apache.beam
>  beam-runners-spark
>  0.7.0-SNAPSHOT
> 
>
>
> someone please help me.
>
>
>
> Sent from Mailbird
> <http://www.getmailbird.com/?utm_source=Mailbird_medium=email_campaign=sent-from-mailbird>
>
> On 2017/4/28 4:43:23, Aviem Zur <aviem...@gmail.com> wrote:
> Yes. Spark streaming support is still experimental and this issue exists
> in Beam 0.6.0
>
> This has since been fixed and the fix will be a part of the upcoming
> release.
>
> Since this isn't the first time a user has encountered this I've created a
> JIRA ticket for better visibility for this issue:
> https://issues.apache.org/jira/browse/BEAM-2106
>
> Thanks for reaching out! Please feel fry to try out your pipeline using
> Beam master branch or one of the nightly SNAPSHOT builds.
>
> On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498...@qq.com> wrote:
>
>> Here is my maven configuration, thank you.
>>
>> 
>>   org.apache.beam
>>   beam-sdks-java-core
>>   0.6.0
>> 
>> 
>>   org.apache.beam
>>   beam-runners-direct-j

Re: Beam Slack channel

2017-06-25 Thread Aviem Zur
Done

On Sun, Jun 25, 2017 at 4:51 PM Aleksandr  wrote:

> Hello,
> Can someone  please add me to the slack channel?
>
> Best regards
> Aleksandr Gortujev.
>
>


Re: No TransformEvaluator registered error

2017-05-30 Thread Aviem Zur
Hi Antony,

Unbounded views are not currently supported in the Spark runner. The
following ticket tracks the progress for adding support for this:
https://issues.apache.org/jira/browse/BEAM-2112

On Tue, May 30, 2017 at 7:11 PM Jean-Baptiste Onofré 
wrote:

> Hi Antony,
>
> which version of the Spark runner are you using ?
>
> Regards
> JB
>
> On 05/30/2017 06:04 PM, Antony Mayi wrote:
> > Hi,
> >
> > started getting following error when running on spark. can anyone see
> what I
> > could be doing wrong? It only happens with unbounded source. Same
> pipeline with
> > bounded source runs fine.
> >
> > thx,
> > a.
> >
> > java.lang.IllegalStateException: No TransformEvaluator registered for
> UNBOUNDED
> > transform class org.apache.beam.sdk.transforms.View$CreatePCollectionView
> > at
> >
> org.apache.beam.runners.spark.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:518)
> > at
> >
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded(StreamingTransformTranslator.java:529)
> > at
> >
> org.apache.beam.runners.spark.SparkRunner$Evaluator.translate(SparkRunner.java:435)
> > at
> >
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:405)
> > at
> >
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:395)
> > at
> >
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:491)
> > at
> >
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
> > at
> >
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
> > at
> >
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
> > at
> >
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
> > at
> >
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$400(TransformHierarchy.java:235)
> > at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:210)
> > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:409)
> > at
> >
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:88)
> > at
> >
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:47)
> > at
> >
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$10.apply(JavaStreamingContext.scala:776)
> > at
> >
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$10.apply(JavaStreamingContext.scala:775)
> > at scala.Option.getOrElse(Option.scala:120)
> > at
> >
> org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
> > at
> >
> org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:775)
> > at
> >
> org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
> > at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:155)
> > at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:85)
> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Slack

2017-09-14 Thread Aviem Zur
Invitation sent

On Thu, Sep 14, 2017 at 2:14 PM Saiguang Che  wrote:

> Hi,
>
> Could you please add me to the Slack channel? Thanks!
>
> Saiguang
>


Re: Spark and Beam

2017-09-27 Thread Aviem Zur
 main(final String[] args) {
>>>>>> final Pipeline pipeline = 
>>>>>> Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());
>>>>>>
>>>>>> pipeline.apply(GenerateSequence.from(0).to(10L))
>>>>>> .apply(ParDo.of(new DoFn<Integer, Product>() {
>>>>>> @ProcessElement
>>>>>> public void onElement(final ProcessContext context) {
>>>>>> final int i = context.element();
>>>>>> context.output(new Product(i, "Product #" + i));
>>>>>> }
>>>>>> }));
>>>>>>
>>>>>> pipeline.run();
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Then it is just a matter of having the beam dependency matching your 
>>>>>> runner (target environment). For testing the direct runner is enough but 
>>>>>> to run on spark you will need to import the spark one as dependency.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Romain Manni-Bucau
>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>> <https://blog-rmannibucau.rhcloud.com> | Old Blog
>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>> <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory
>>>>>> <https://javaeefactory-rmannibucau.rhcloud.com>
>>>>>>
>>>>>> 2017-09-26 11:02 GMT+02:00 tal m <tal.m...@gmail.com>:
>>>>>>
>>>>>>> HI
>>>>>>> i looked at the links you sent me, and i haven't found any clue how
>>>>>>> to adapt it to my current code.
>>>>>>> my code is very simple:
>>>>>>>
>>>>>>> val sc = spark.sparkContext
>>>>>>>
>>>>>>> val productsNum = 10
>>>>>>> println(s"Saving $productsNum products RDD to the space")
>>>>>>> val rdd = sc.parallelize(1 to productsNum).map { i =>
>>>>>>>   Product(i, "Description of product " + i, Random.nextInt(10), 
>>>>>>> Random.nextBoolean())
>>>>>>> }
>>>>>>>
>>>>>>> is that simple to use beam instead of SparkContext ? i'm not familiar 
>>>>>>> with Spark at all so i have no idea what is Spark runner and how can i 
>>>>>>> use it in my case, just need to make it work :).
>>>>>>>
>>>>>>> Thanks Tal
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 26, 2017 at 11:57 AM, Aviem Zur <aviem...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Tal,
>>>>>>>>
>>>>>>>> Thanks for reaching out!
>>>>>>>>
>>>>>>>> Please take a look at our documentation:
>>>>>>>>
>>>>>>>> Quickstart guide (Java):
>>>>>>>> https://beam.apache.org/get-started/quickstart-java/
>>>>>>>> This guide will show you how to run our wordcount example using
>>>>>>>> each any of the runners (For example, direct runner or Spark runner in 
>>>>>>>> your
>>>>>>>> case).
>>>>>>>>
>>>>>>>> More reading:
>>>>>>>> Programming guide:
>>>>>>>> https://beam.apache.org/documentation/programming-guide/
>>>>>>>> Spark runner: https://beam.apache.org/documentation/runners/spark/
>>>>>>>>
>>>>>>>> Please let us know if you have further questions, and good luck
>>>>>>>> with your first try of Beam!
>>>>>>>>
>>>>>>>> Aviem.
>>>>>>>>
>>>>>>>> On Tue, Sep 26, 2017 at 11:47 AM tal m <tal.m...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> hi
>>>>>>>>> i'm new at Spark and also at beam.
>>>>>>>>> currently i have Java code that use Spark from reading some data
>>>>>>>>> from DB.
>>>>>>>>> my Spark code using SparkSession.builder (.) and also
>>>>>>>>> sparkContext.
>>>>>>>>> how can i make beam work similar to my current code, i just want
>>>>>>>>> make it work for now.
>>>>>>>>> Thanks Tal
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Slack Channel

2017-08-28 Thread Aviem Zur
Invitation sent.

On Mon, Aug 28, 2017 at 3:31 PM Sobhan Badiozamany <
sobhan.badiozam...@leovegas.com> wrote:

> Hi,
>
> Could you please add me to the slack channel?
>
> Thanks,
> Sobi
>


Re: [INFO] Spark runner updated to Spark 2.2.1

2017-12-18 Thread Aviem Zur
Nice!

On Mon, Dec 18, 2017 at 12:51 PM Jean-Baptiste Onofré 
wrote:

> Hi all,
>
> We are pleased to announce that Spark 2.x support in Spark runner has been
> merged this morning. It supports Spark 2.2.1.
>
> In the same PR, we did update to Scala 2.11, including Flink artifacts
> update to
> 2.11 (it means it's already ready to upgrade to Flink 1.4 !).
>
> It also means, as planned, that Spark 2.x support will be included in next
> Beam
> 2.3.0 release.
>
> Now, we are going to work on improvements in the Spark runner.
>
> If you have any issue with the Spark runner, please let us know.
>
> Thanks !
> Regards
> JB
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Slack invitation

2018-02-05 Thread Aviem Zur
Invitation sent.

On Mon, Feb 5, 2018 at 1:43 PM Jose Ignacio Honrado Benítez <
jihonra...@gmail.com> wrote:

> Hello,
>
> I would like to get invited to the Apache Beam Slack.
>
> Thanks in advance!
>
> Cheers
>


Re: Slack invite please

2018-02-13 Thread Aviem Zur
Invitation sent

On Tue, Feb 13, 2018 at 5:40 PM Sanjeev Nutan  wrote:

> Hi fellow Beamers, please could I have a invite to the slack channel?
> Thanks.
>


Re: slack invite pls

2018-02-13 Thread Aviem Zur
Invitation sent

On Tue, Feb 13, 2018 at 5:43 PM ramesh krishnan muthusamy <
ramkrish1...@gmail.com> wrote:

> Hi
>
>  please could I have a invite to the slack channel?
>
> -Ramesh
>