On 2017-09-05 19:01, Jean-Baptiste Onofré <[email protected]> wrote: > Hi guys > > I created PR #3808: > > https://github.com/apache/beam/pull/3808 > > It introduces a new Spark 2.x specific runner (in addition of the Spark 1.x > one). > > I have some questions (more for dev) that I let in a comment. > > It's still a work in progress as I have to fix unit tests and discuss about > the > validate runner test. However you can already take a look. > > Regards > JB > > On 09/04/2017 04:00 PM, Mahender Devaruppala wrote: > > Sure, thanks very much JB, will look forward to your link. > > > > -----Original Message----- > > From: Jean-Baptiste Onofré [mailto:[email protected]] > > Sent: Monday, September 4, 2017 8:59 AM > > To: [email protected] > > Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue > > > > Hi, > > > > Actually, I'm preparing the PR. I will send the PR link to you. > > > > Give me just some time to rebase my branch and push. > > > > Thanks, > > Regards > > JB > > > > On 09/04/2017 03:15 PM, Mahender Devaruppala wrote: > >> Hi JB, > >> > >> If possible, could you please send me the code/location to download Spark > >> Runner for Spark 2.x? > >> > >> Thanks, > >> Mahender > >> > >> -----Original Message----- > >> From: Jean-Baptiste Onofré [mailto:[email protected]] > >> Sent: Friday, September 1, 2017 1:54 AM > >> To: [email protected] > >> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue > >> > >> Sure, I will send the PR during the weekend. I will let you know. > >> > >> Regards > >> JB > >> > >> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote: > >>> Thanks JB. Could you please point me to the location of Spark Runner > >>> specific to Spark 2.x or is this something part any configurations? > >>> > >>> -----Original Message----- > >>> From: Jean-Baptiste Onofré [mailto:[email protected]] > >>> Sent: Thursday, August 31, 2017 12:11 AM > >>> To: [email protected] > >>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue > >>> > >>> Hi, > >>> > >>> I'm working on a Spark runner specific to Spark 2.x as the API changed. > >>> > >>> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x. > >>> > >>> Regards > >>> JB > >>> > >>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote: > >>>> Hello, > >>>> > >>>> I am running into spark assertion error when running a apache > >>>> pipeline and below are the details: > >>>> > >>>> Apache Beam version: 2.1.0 > >>>> > >>>> Spark version: 2.1.0 > >>>> > >>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset > >>>> must return a zero value copy > >>>> > >>>> at scala.Predef$.assert(Predef.scala:179) > >>>> > >>>> at > >>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala: > >>>> 1 > >>>> 62) > >>>> > >>>> at > >>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native > >>>> Method) > >>>> > >>>> at > >>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown > >>>> Source) > >>>> > >>>> at > >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) > >>>> > >>>> Can you please let me know if Apache beam v2.1.0 Spark runner is > >>>> compatible to work with Spark v2.1.0? > >>>> > >>>> Below is the code snippet for the pipeline: > >>>> > >>>> PipelineOptionsFactory./register/(CSVOptions.*class*); > >>>> > >>>> CSVOptions options= > >>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio > >>>> n > >>>> s > >>>> .*class*); > >>>> > >>>> options.setRunner(SparkRunner.*class*); > >>>> > >>>> options.setSparkMaster("local[4]"); > >>>> > >>>> options.setEnableSparkMetricSinks(*false*); > >>>> > >>>> Pipeline p= Pipeline./create/(options); > >>>> > >>>> p.apply("ReadMyCSVFile", > >>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile()))) > >>>> > >>>> .apply(*new*DataLoader()) > >>>> > >>>> > >>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration > >>>> > >>>> > >>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd > >>>> b > >>>> c > >>>> :postgresql://localhost:5432/beam") > >>>> > >>>> > >>>> .withUsername("postgres").withPassword("postgres")).withStatement("i > >>>> n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") > >>>> > >>>> > >>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St > >>>> r > >>>> i > >>>> ng>()_ { > >>>> > >>>> *public**void*setParameters(String element, PreparedStatement query) > >>>> *throws*SQLException { > >>>> > >>>> String[] datas= > >>>> element.split("\t"); > >>>> > >>>> *if*(datas.length>0) { > >>>> > >>>> *for*(*int*j=0 ; j<datas.length;j++){ > >>>> > >>>> query.setString(j+1, datas[j]); > >>>> > >>>> } > >>>> > >>>> } > >>>> > >>>> } > >>>> > >>>> })); > >>>> > >>>> SparkRunner runner= SparkRunner./create/(options); > >>>> > >>>> runner.run(p).waitUntilFinish(); > >>>> > >>>> Any help would be greatly appreciated. > >>>> > >>>> Thanks, > >>>> > >>>> Mahender > >>>> > >>> > >>> -- > >>> Jean-Baptiste Onofré > >>> [email protected] > >>> http://blog.nanthrax.net > >>> Talend - http://www.talend.com > >>> > >> > >> -- > >> Jean-Baptiste Onofré > >> [email protected] > >> http://blog.nanthrax.net > >> Talend - http://www.talend.com > >> > > > > -- > > Jean-Baptiste Onofré > > [email protected] > > http://blog.nanthrax.net > > Talend - http://www.talend.com > > > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com > Hi JB, I have tried this PR and still I see the same error i.e Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy Does the change from Iterables to iterators is supposed to fix this issue or missing something else . ? please confirm.
Thanks in Advance. Suri
