On 2017-09-08 11:39, Jean-Baptiste Onofré <[email protected]> wrote: 
> Hmmm weird, let me check.
> 
> Regards
> JB
> 
> On 09/07/2017 09:03 PM, [email protected] wrote:
> > 
> > 
> > On 2017-09-05 19:01, Jean-Baptiste Onofré <[email protected]> wrote:
> >> Hi guys
> >>
> >> I created PR #3808:
> >>
> >> https://github.com/apache/beam/pull/3808
> >>
> >> It introduces a new Spark 2.x specific runner (in addition of the Spark 
> >> 1.x one).
> >>
> >> I have some questions (more for dev) that I let in a comment.
> >>
> >> It's still a work in progress as I have to fix unit tests and discuss 
> >> about the
> >> validate runner test. However you can already take a look.
> >>
> >> Regards
> >> JB
> >>
> >> On 09/04/2017 04:00 PM, Mahender Devaruppala wrote:
> >>> Sure, thanks very much JB, will look forward to your link.
> >>>
> >>> -----Original Message-----
> >>> From: Jean-Baptiste Onofré [mailto:[email protected]]
> >>> Sent: Monday, September 4, 2017 8:59 AM
> >>> To: [email protected]
> >>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> >>>
> >>> Hi,
> >>>
> >>> Actually, I'm preparing the PR. I will send the PR link to you.
> >>>
> >>> Give me just some time to rebase my branch and push.
> >>>
> >>> Thanks,
> >>> Regards
> >>> JB
> >>>
> >>> On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
> >>>> Hi JB,
> >>>>
> >>>> If possible, could you please send me the code/location to download 
> >>>> Spark Runner for Spark 2.x?
> >>>>
> >>>> Thanks,
> >>>> Mahender
> >>>>
> >>>> -----Original Message-----
> >>>> From: Jean-Baptiste Onofré [mailto:[email protected]]
> >>>> Sent: Friday, September 1, 2017 1:54 AM
> >>>> To: [email protected]
> >>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> >>>>
> >>>> Sure, I will send the PR during the weekend. I will let you know.
> >>>>
> >>>> Regards
> >>>> JB
> >>>>
> >>>> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
> >>>>> Thanks JB.  Could you please point me to the location of Spark Runner 
> >>>>> specific to Spark 2.x or is this something part any configurations?
> >>>>>
> >>>>> -----Original Message-----
> >>>>> From: Jean-Baptiste Onofré [mailto:[email protected]]
> >>>>> Sent: Thursday, August 31, 2017 12:11 AM
> >>>>> To: [email protected]
> >>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> I'm working on a Spark runner specific to Spark 2.x as the API changed.
> >>>>>
> >>>>> So, for now, I have two runners: one for Spark 1.x and one for Spark 
> >>>>> 2.x.
> >>>>>
> >>>>> Regards
> >>>>> JB
> >>>>>
> >>>>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
> >>>>>> Hello,
> >>>>>>
> >>>>>> I am running into spark assertion error when running a apache
> >>>>>> pipeline and below are the details:
> >>>>>>
> >>>>>> Apache Beam version: 2.1.0
> >>>>>>
> >>>>>> Spark version: 2.1.0
> >>>>>>
> >>>>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset
> >>>>>> must return a zero value copy
> >>>>>>
> >>>>>>                     at scala.Predef$.assert(Predef.scala:179)
> >>>>>>
> >>>>>>                     at
> >>>>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
> >>>>>> 1
> >>>>>> 62)
> >>>>>>
> >>>>>>                     at
> >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> >>>>>> Method)
> >>>>>>
> >>>>>>                     at
> >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
> >>>>>> Source)
> >>>>>>
> >>>>>>                     at
> >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> >>>>>>
> >>>>>> Can you please let me know if Apache beam v2.1.0 Spark runner is
> >>>>>> compatible to work with Spark v2.1.0?
> >>>>>>
> >>>>>> Below is the code snippet for the pipeline:
> >>>>>>
> >>>>>>             PipelineOptionsFactory./register/(CSVOptions.*class*);
> >>>>>>
> >>>>>>                   CSVOptions options=
> >>>>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio
> >>>>>> n
> >>>>>> s
> >>>>>> .*class*);
> >>>>>>
> >>>>>> options.setRunner(SparkRunner.*class*);
> >>>>>>
> >>>>>> options.setSparkMaster("local[4]");
> >>>>>>
> >>>>>> options.setEnableSparkMetricSinks(*false*);
> >>>>>>
> >>>>>>                   Pipeline p= Pipeline./create/(options);
> >>>>>>
> >>>>>> p.apply("ReadMyCSVFile",
> >>>>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
> >>>>>>
> >>>>>>                   .apply(*new*DataLoader())
> >>>>>>
> >>>>>>                   
> >>>>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
> >>>>>>
> >>>>>>             
> >>>>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd
> >>>>>> b
> >>>>>> c
> >>>>>> :postgresql://localhost:5432/beam")
> >>>>>>
> >>>>>>                              
> >>>>>> .withUsername("postgres").withPassword("postgres")).withStatement("i
> >>>>>> n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
> >>>>>>
> >>>>>>                                 
> >>>>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St
> >>>>>> r
> >>>>>> i
> >>>>>> ng>()_ {
> >>>>>>
> >>>>>> *public**void*setParameters(String element, PreparedStatement query)
> >>>>>> *throws*SQLException {
> >>>>>>
> >>>>>>                                              String[] datas=
> >>>>>> element.split("\t");
> >>>>>>
> >>>>>> *if*(datas.length>0) {
> >>>>>>
> >>>>>> *for*(*int*j=0 ; j<datas.length;j++){
> >>>>>>
> >>>>>> query.setString(j+1, datas[j]);
> >>>>>>
> >>>>>>                                                     }
> >>>>>>
> >>>>>>                                              }
> >>>>>>
> >>>>>> }
> >>>>>>
> >>>>>>                     }));
> >>>>>>
> >>>>>>                   SparkRunner runner= SparkRunner./create/(options);
> >>>>>>
> >>>>>> runner.run(p).waitUntilFinish();
> >>>>>>
> >>>>>> Any help would be greatly appreciated.
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>> Mahender
> >>>>>>
> >>>>>
> >>>>> --
> >>>>> Jean-Baptiste Onofré
> >>>>> [email protected]
> >>>>> http://blog.nanthrax.net
> >>>>> Talend - http://www.talend.com
> >>>>>
> >>>>
> >>>> --
> >>>> Jean-Baptiste Onofré
> >>>> [email protected]
> >>>> http://blog.nanthrax.net
> >>>> Talend - http://www.talend.com
> >>>>
> >>>
> >>> --
> >>> Jean-Baptiste Onofré
> >>> [email protected]
> >>> http://blog.nanthrax.net
> >>> Talend - http://www.talend.com
> >>>
> >>
> >> -- 
> >> Jean-Baptiste Onofré
> >> [email protected]
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >> Hi JB,
> > I have tried this PR and still I see the same error i.e Caused by: 
> > java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
> > value copy
> > Does the change from Iterables to iterators is supposed to fix this issue 
> > or missing something else . ?
> > please confirm.
> > 
> > Thanks in Advance.
> > Suri
> > 
> 
> -- 
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> Hi JB, 
like discussed here 
https://stackoverflow.com/questions/42336251/assertionerror-assertion-failed-copyandreset-must-return-a-zero-value-copy
 and https://github.com/bigdatagenomics/adam/issues/1021#issuecomment-216283222

Does spark-core needs to be changed ..?  can you please confirm.

Thanks in advance,
Suri

Reply via email to