Thanks JB.  Could you please point me to the location of Spark Runner specific 
to Spark 2.x or is this something part any configurations?

-----Original Message-----
From: Jean-Baptiste Onofré [mailto:[email protected]] 
Sent: Thursday, August 31, 2017 12:11 AM
To: [email protected]
Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue

Hi,

I'm working on a Spark runner specific to Spark 2.x as the API changed.

So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.

Regards
JB

On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
> Hello,
> 
> I am running into spark assertion error when running a apache pipeline 
> and below are the details:
> 
> Apache Beam version: 2.1.0
> 
> Spark version: 2.1.0
> 
> Caused by: java.lang.AssertionError: assertion failed: copyAndReset 
> must return a zero value copy
> 
>                 at scala.Predef$.assert(Predef.scala:179)
> 
>                 at
> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:1
> 62)
> 
>                 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> 
>                 at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown 
> Source)
> 
>                 at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> 
> Can you please let me know if Apache beam v2.1.0 Spark runner is 
> compatible to work with Spark v2.1.0?
> 
> Below is the code snippet for the pipeline:
> 
>         PipelineOptionsFactory./register/(CSVOptions.*class*);
> 
>               CSVOptions options=
> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptions
> .*class*);
> 
> options.setRunner(SparkRunner.*class*);
> 
> options.setSparkMaster("local[4]");
> 
> options.setEnableSparkMetricSinks(*false*);
> 
>               Pipeline p= Pipeline./create/(options);
> 
> p.apply("ReadMyCSVFile",
> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
> 
>               .apply(*new*DataLoader())
> 
>               
> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
> 
>         
> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jdbc
> :postgresql://localhost:5432/beam")
> 
>                          
> .withUsername("postgres").withPassword("postgres")).withStatement("ins
> ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
> 
>                             
> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<Stri
> ng>()_ {
> 
> *public**void*setParameters(String element, PreparedStatement query) 
> *throws*SQLException {
> 
>                                          String[] datas= 
> element.split("\t");
> 
> *if*(datas.length>0) {
> 
> *for*(*int*j=0 ; j<datas.length;j++){
> 
> query.setString(j+1, datas[j]);
> 
>                                                 }
> 
>                                          }
> 
> }
> 
>                 }));
> 
>               SparkRunner runner= SparkRunner./create/(options);
> 
> runner.run(p).waitUntilFinish();
> 
> Any help would be greatly appreciated.
> 
> Thanks,
> 
> Mahender
> 

--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to