Thanks JB. Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
-----Original Message----- From: Jean-Baptiste Onofré [mailto:[email protected]] Sent: Thursday, August 31, 2017 12:11 AM To: [email protected] Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue Hi, I'm working on a Spark runner specific to Spark 2.x as the API changed. So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x. Regards JB On 08/30/2017 11:45 PM, Mahender Devaruppala wrote: > Hello, > > I am running into spark assertion error when running a apache pipeline > and below are the details: > > Apache Beam version: 2.1.0 > > Spark version: 2.1.0 > > Caused by: java.lang.AssertionError: assertion failed: copyAndReset > must return a zero value copy > > at scala.Predef$.assert(Predef.scala:179) > > at > org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:1 > 62) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) > > Can you please let me know if Apache beam v2.1.0 Spark runner is > compatible to work with Spark v2.1.0? > > Below is the code snippet for the pipeline: > > PipelineOptionsFactory./register/(CSVOptions.*class*); > > CSVOptions options= > PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptions > .*class*); > > options.setRunner(SparkRunner.*class*); > > options.setSparkMaster("local[4]"); > > options.setEnableSparkMetricSinks(*false*); > > Pipeline p= Pipeline./create/(options); > > p.apply("ReadMyCSVFile", > TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile()))) > > .apply(*new*DataLoader()) > > > .apply(JdbcIO.<String>/write/().withDataSourceConfiguration > > > (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jdbc > :postgresql://localhost:5432/beam") > > > .withUsername("postgres").withPassword("postgres")).withStatement("ins > ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") > > > .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<Stri > ng>()_ { > > *public**void*setParameters(String element, PreparedStatement query) > *throws*SQLException { > > String[] datas= > element.split("\t"); > > *if*(datas.length>0) { > > *for*(*int*j=0 ; j<datas.length;j++){ > > query.setString(j+1, datas[j]); > > } > > } > > } > > })); > > SparkRunner runner= SparkRunner./create/(options); > > runner.run(p).waitUntilFinish(); > > Any help would be greatly appreciated. > > Thanks, > > Mahender > -- Jean-Baptiste Onofré [email protected] http://blog.nanthrax.net Talend - http://www.talend.com
