Hi,
I'm working on a Spark runner specific to Spark 2.x as the API changed.
So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
Regards
JB
On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
Hello,
I am running into spark assertion error when running a apache pipeline and below
are the details:
Apache Beam version: 2.1.0
Spark version: 2.1.0
Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return
a zero value copy
at scala.Predef$.assert(Predef.scala:179)
at
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
Can you please let me know if Apache beam v2.1.0 Spark runner is compatible to
work with Spark v2.1.0?
Below is the code snippet for the pipeline:
PipelineOptionsFactory./register/(CSVOptions.*class*);
CSVOptions options=
PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptions.*class*);
options.setRunner(SparkRunner.*class*);
options.setSparkMaster("local[4]");
options.setEnableSparkMetricSinks(*false*);
Pipeline p= Pipeline./create/(options);
p.apply("ReadMyCSVFile",
TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
.apply(*new*DataLoader())
.apply(JdbcIO.<String>/write/().withDataSourceConfiguration
(JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jdbc:postgresql://localhost:5432/beam")
.withUsername("postgres").withPassword("postgres")).withStatement("insert into
test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
.withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<String>()_ {
*public**void*setParameters(String element, PreparedStatement query)
*throws*SQLException {
String[] datas= element.split("\t");
*if*(datas.length>0) {
*for*(*int*j=0 ; j<datas.length;j++){
query.setString(j+1, datas[j]);
}
}
}
}));
SparkRunner runner= SparkRunner./create/(options);
runner.run(p).waitUntilFinish();
Any help would be greatly appreciated.
Thanks,
Mahender
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com