To my knowledge you should use Spark 1.6.3 since that is what is declared as the spark.version in the projects root pom.xml
On Wed, Aug 30, 2017 at 2:45 PM, Mahender Devaruppala < [email protected]> wrote: > Hello, > > > > I am running into spark assertion error when running a apache pipeline and > below are the details: > > > > Apache Beam version: 2.1.0 > > Spark version: 2.1.0 > > > > Caused by: java.lang.AssertionError: assertion failed: copyAndReset must > return a zero value copy > > at scala.Predef$.assert(Predef.scala:179) > > at org.apache.spark.util.AccumulatorV2.writeReplace( > AccumulatorV2.scala:162) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > > at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) > > > > Can you please let me know if Apache beam v2.1.0 Spark runner is > compatible to work with Spark v2.1.0? > > > > Below is the code snippet for the pipeline: > > > > PipelineOptionsFactory.*register*(CSVOptions.*class*); > > CSVOptions options = PipelineOptionsFactory.*fromArgs*(args > ).withValidation().as(CSVOptions.*class*); > > options.setRunner(SparkRunner.*class*); > > options.setSparkMaster("local[4]"); > > options.setEnableSparkMetricSinks(*false*); > > Pipeline p = Pipeline.*create*(options); > > p.apply("ReadMyCSVFile", TextIO.*read*().from(URIUtil. > *getFromPath*(options.getInputFile()))) > > .apply(*new* DataLoader()) > > .apply(JdbcIO.<String>*write*().withDataSourceConfiguration > > (JdbcIO.DataSourceConfiguration.*create*("org.postgresql.Driver"," > jdbc:postgresql://localhost:5432/beam") > > .withUsername("postgres").withPassword("postgres" > )).withStatement("insert into test_table values(?,?,?,?,?,?,?,?,?,?,?,? > ,?,?,?,?)") > > .withPreparedStatementSetter(*new* > *JdbcIO.PreparedStatementSetter<String>()* { > > *public* *void* setParameters(String > element, PreparedStatement query) *throws* SQLException { > > String[] datas = element.split( > "\t"); > > *if*(datas.length >0) { > > *for*(*int* j=0 ; j<datas. > length;j++){ > > query.setString(j+1, > datas[j]); > > } > > } > > > > } > > })); > > SparkRunner runner = SparkRunner.*create*(options); > > runner.run(p).waitUntilFinish(); > > > > > > Any help would be greatly appreciated. > > > > Thanks, > > Mahender > > >
