Hmmm weird, let me check.

Regards
JB

On 09/07/2017 09:03 PM, [email protected] wrote:


On 2017-09-05 19:01, Jean-Baptiste Onofré <[email protected]> wrote:
Hi guys

I created PR #3808:

https://github.com/apache/beam/pull/3808

It introduces a new Spark 2.x specific runner (in addition of the Spark 1.x 
one).

I have some questions (more for dev) that I let in a comment.

It's still a work in progress as I have to fix unit tests and discuss about the
validate runner test. However you can already take a look.

Regards
JB

On 09/04/2017 04:00 PM, Mahender Devaruppala wrote:
Sure, thanks very much JB, will look forward to your link.

-----Original Message-----
From: Jean-Baptiste Onofré [mailto:[email protected]]
Sent: Monday, September 4, 2017 8:59 AM
To: [email protected]
Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue

Hi,

Actually, I'm preparing the PR. I will send the PR link to you.

Give me just some time to rebase my branch and push.

Thanks,
Regards
JB

On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
Hi JB,

If possible, could you please send me the code/location to download Spark 
Runner for Spark 2.x?

Thanks,
Mahender

-----Original Message-----
From: Jean-Baptiste Onofré [mailto:[email protected]]
Sent: Friday, September 1, 2017 1:54 AM
To: [email protected]
Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue

Sure, I will send the PR during the weekend. I will let you know.

Regards
JB

On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
Thanks JB.  Could you please point me to the location of Spark Runner specific 
to Spark 2.x or is this something part any configurations?

-----Original Message-----
From: Jean-Baptiste Onofré [mailto:[email protected]]
Sent: Thursday, August 31, 2017 12:11 AM
To: [email protected]
Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue

Hi,

I'm working on a Spark runner specific to Spark 2.x as the API changed.

So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.

Regards
JB

On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
Hello,

I am running into spark assertion error when running a apache
pipeline and below are the details:

Apache Beam version: 2.1.0

Spark version: 2.1.0

Caused by: java.lang.AssertionError: assertion failed: copyAndReset
must return a zero value copy

                    at scala.Predef$.assert(Predef.scala:179)

                    at
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
1
62)

                    at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)

                    at
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)

                    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

Can you please let me know if Apache beam v2.1.0 Spark runner is
compatible to work with Spark v2.1.0?

Below is the code snippet for the pipeline:

            PipelineOptionsFactory./register/(CSVOptions.*class*);

                  CSVOptions options=
PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio
n
s
.*class*);

options.setRunner(SparkRunner.*class*);

options.setSparkMaster("local[4]");

options.setEnableSparkMetricSinks(*false*);

                  Pipeline p= Pipeline./create/(options);

p.apply("ReadMyCSVFile",
TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))

                  .apply(*new*DataLoader())

.apply(JdbcIO.<String>/write/().withDataSourceConfiguration

(JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd
b
c
:postgresql://localhost:5432/beam")

.withUsername("postgres").withPassword("postgres")).withStatement("i
n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")

.withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St
r
i
ng>()_ {

*public**void*setParameters(String element, PreparedStatement query)
*throws*SQLException {

                                             String[] datas=
element.split("\t");

*if*(datas.length>0) {

*for*(*int*j=0 ; j<datas.length;j++){

query.setString(j+1, datas[j]);

                                                    }

                                             }

}

                    }));

                  SparkRunner runner= SparkRunner./create/(options);

runner.run(p).waitUntilFinish();

Any help would be greatly appreciated.

Thanks,

Mahender


--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com


--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com


--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com


--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com
Hi JB,
I have tried this PR and still I see the same error i.e Caused by: 
java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy
Does the change from Iterables to iterators is supposed to fix this issue or 
missing something else . ?
please confirm.

Thanks in Advance.
Suri


--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to