Re: Java SPI jar reload in Spark

2017-06-07 Thread Ryan
I'd suggest scripts like js, groovy, etc.. To my understanding the service
loader mechanism isn't a good fit for runtime reloading.

On Wed, Jun 7, 2017 at 4:55 PM, Jonnas Li(Contractor) <
zhongshuang...@envisioncn.com> wrote:

> To be more explicit, I used mapwithState() in my application, just like
> this:
>
> stream = KafkaUtils.createStream(..)
> mappedStream = stream.mapPartitionToPair(..)
> stateStream = mappedStream.mapwithState(*MyUpdateFunc*(..))
> stateStream.foreachRDD(..)
>
> I call the jar in *MyUpdateFunc()*, and the jar reloading is triggered by
> some other event.
>
> I'm not sure if this approach is feasible. To my understand, Spark will
> checkpoint the status, so the application can’t be updated at runtime,
> that’s why I got the exception.
>
> Any suggestion is welcome, if there is any other idea to do something like
> this, I just want to provide a approach to enable users can customize for
> their business logic.
>
> Regards
> 李忠双 / Jonnas
>
> 发件人: Zhongshuang Li <zhongshuang...@envisioncn.com>
> 日期: 2017年6月6日 星期二 下午6:30
> 至: Alonso Isidoro Roman <alons...@gmail.com>
>
> 抄送: Jörn Franke <jornfra...@gmail.com>, "user@spark.apache.org" <
> user@spark.apache.org>
> 主题: Re: Java SPI jar reload in Spark
>
> I used java.util.ServiceLoader
> <https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html>  ,
> as the javadoc says it supports reloading.
> Please point it out if I'm mis-understanding.
>
> Regards
> Jonnas Li
>
> 发件人: Alonso Isidoro Roman <alons...@gmail.com>
> 日期: 2017年6月6日 星期二 下午6:21
> 至: Zhongshuang Li <zhongshuang...@envisioncn.com>
> 抄送: Jörn Franke <jornfra...@gmail.com>, "user@spark.apache.org" <
> user@spark.apache.org>
> 主题: Re: Java SPI jar reload in Spark
>
> Hi, a quick search on google.
>
> https://github.com/spark-jobserver/spark-jobserver/issues/130
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links>
>
> 2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) <
> zhongshuang...@envisioncn.com>:
>
>> Thank for your quick response.
>> These jars are used to define some customize business logic, and they can
>> be treat as plug-ins in our business scenario. And the jars are
>> developed/maintain by some third-party partners, this means there will be
>> some version updating.
>> My expectation is update the business code with restarting the spark
>> streaming job, any suggestion will be very grateful.
>>
>> Regards
>> Jonnas Li
>>
>> 发件人: Jörn Franke <jornfra...@gmail.com>
>> 日期: 2017年6月6日 星期二 下午5:55
>> 至: Zhongshuang Li <zhongshuang...@envisioncn.com>
>> 抄送: "user@spark.apache.org" <user@spark.apache.org>
>> 主题: Re: Java SPI jar reload in Spark
>>
>> Why do you need jar reloading? What functionality is executed during jar
>> reloading. Maybe there is another way to achieve the same without jar
>> reloading. In fact, it might be dangerous from a functional point of view-
>> functionality in jar changed and all your computation is wrong.
>>
>> On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) <
>> zhongshuang...@envisioncn.com> wrote:
>>
>> I have a Spark Streaming application, which dynamically calling a jar
>> (Java SPI), and the jar is called in a mapWithState() function, it was
>> working fine for a long time.
>> Recently, I got a requirement which required to reload the jar during
>> runtime.
>> But when the reloading is completed, the spark streaming job got failed,
>> and I get the following exception, it seems the spark try to deserialize
>> the checkpoint failed.
>> My question is whether the logic in the jar will be serialized into
>> checkpoint, and is it possible to do the jar reloading during runtime in
>> Spark Streaming?
>>
>>
>> [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, 
>> ip-10-21-14-205.envisioncn.com): java.lang.ClassCastException: cannot assign 
>> instance of scala.collection.immutable.List$SerializationProxy to field 
>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
>> scala.collection.Seq in instance of 
>> org.apache.spark.streaming.rdd.MapWithStateRDD
>>  at 
>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
>>  at 
>> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
>>

Re: Java SPI jar reload in Spark

2017-06-07 Thread Jonnas Li(Contractor)
To be more explicit, I used mapwithState() in my application, just like this:

stream = KafkaUtils.createStream(..)
mappedStream = stream.mapPartitionToPair(..)
stateStream = mappedStream.mapwithState(MyUpdateFunc(..))
stateStream.foreachRDD(..)

I call the jar in MyUpdateFunc(), and the jar reloading is triggered by some 
other event.

I'm not sure if this approach is feasible. To my understand, Spark will 
checkpoint the status, so the application can’t be updated at runtime, that’s 
why I got the exception.

Any suggestion is welcome, if there is any other idea to do something like 
this, I just want to provide a approach to enable users can customize for their 
business logic.

Regards
李忠双 / Jonnas

发件人: Zhongshuang Li 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>
日期: 2017年6月6日 星期二 下午6:30
至: Alonso Isidoro Roman <alons...@gmail.com<mailto:alons...@gmail.com>>
抄送: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

I used 
java.util.ServiceLoader<https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html>
  , as the javadoc says it supports reloading.
Please point it out if I'm mis-understanding.

Regards
Jonnas Li

发件人: Alonso Isidoro Roman <alons...@gmail.com<mailto:alons...@gmail.com>>
日期: 2017年6月6日 星期二 下午6:21
至: Zhongshuang Li 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>
抄送: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

Hi, a quick search on google.

https://github.com/spark-jobserver/spark-jobserver/issues/130

<https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links>
Alonso Isidoro Roman
about.me/alonso.isidoro.roman


2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>:
Thank for your quick response.
These jars are used to define some customize business logic, and they can be 
treat as plug-ins in our business scenario. And the jars are developed/maintain 
by some third-party partners, this means there will be some version updating.
My expectation is update the business code with restarting the spark streaming 
job, any suggestion will be very grateful.

Regards
Jonnas Li

发件人: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>
日期: 2017年6月6日 星期二 下午5:55
至: Zhongshuang Li 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>
抄送: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

Why do you need jar reloading? What functionality is executed during jar 
reloading. Maybe there is another way to achieve the same without jar 
reloading. In fact, it might be dangerous from a functional point of view- 
functionality in jar changed and all your computation is wrong.

On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> wrote:

I have a Spark Streaming application, which dynamically calling a jar (Java 
SPI), and the jar is called in a mapWithState() function, it was working fine 
for a long time.
Recently, I got a requirement which required to reload the jar during runtime.
But when the reloading is completed, the spark streaming job got failed, and I 
get the following exception, it seems the spark try to deserialize the 
checkpoint failed.
My question is whether the logic in the jar will be serialized into checkpoint, 
and is it possible to do the jar reloading during runtime in Spark Streaming?



[2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, 
ip-10-21-14-205.envisioncn.com<http://ip-10-21-14-205.envisioncn.com>): 
java.lang.ClassCastException: cannot assign instance of 
scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.rdd.RDD.org<http://org.apache.spark.rdd.RDD.org>$apache$spark$rdd$RDD$$dependencies_
 of type scala.collection.Seq in instance of 
org.apache.spark.streaming.rdd.MapWithStateRDD
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
a

Re: Java SPI jar reload in Spark

2017-06-06 Thread Jonnas Li(Contractor)
I used 
java.util.ServiceLoader<https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html>
  , as the javadoc says it supports reloading.
Please point it out if I'm mis-understanding.

Regards
Jonnas Li

发件人: Alonso Isidoro Roman <alons...@gmail.com<mailto:alons...@gmail.com>>
日期: 2017年6月6日 星期二 下午6:21
至: Zhongshuang Li 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>
抄送: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

Hi, a quick search on google.

https://github.com/spark-jobserver/spark-jobserver/issues/130

<https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links>
Alonso Isidoro Roman
about.me/alonso.isidoro.roman


2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>:
Thank for your quick response.
These jars are used to define some customize business logic, and they can be 
treat as plug-ins in our business scenario. And the jars are developed/maintain 
by some third-party partners, this means there will be some version updating.
My expectation is update the business code with restarting the spark streaming 
job, any suggestion will be very grateful.

Regards
Jonnas Li

发件人: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>
日期: 2017年6月6日 星期二 下午5:55
至: Zhongshuang Li 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>
抄送: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

Why do you need jar reloading? What functionality is executed during jar 
reloading. Maybe there is another way to achieve the same without jar 
reloading. In fact, it might be dangerous from a functional point of view- 
functionality in jar changed and all your computation is wrong.

On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> wrote:

I have a Spark Streaming application, which dynamically calling a jar (Java 
SPI), and the jar is called in a mapWithState() function, it was working fine 
for a long time.
Recently, I got a requirement which required to reload the jar during runtime.
But when the reloading is completed, the spark streaming job got failed, and I 
get the following exception, it seems the spark try to deserialize the 
checkpoint failed.
My question is whether the logic in the jar will be serialized into checkpoint, 
and is it possible to do the jar reloading during runtime in Spark Streaming?



[2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, 
ip-10-21-14-205.envisioncn.com<http://ip-10-21-14-205.envisioncn.com>): 
java.lang.ClassCastException: cannot assign instance of 
scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.rdd.RDD.org<http://org.apache.spark.rdd.RDD.org>$apache$spark$rdd$RDD$$dependencies_
 of type scala.collection.Seq in instance of 
org.apache.spark.streaming.rdd.MapWithStateRDD
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(Ob

Re: Java SPI jar reload in Spark

2017-06-06 Thread Alonso Isidoro Roman
Hi, a quick search on google.

https://github.com/spark-jobserver/spark-jobserver/issues/130

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links>

2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) <
zhongshuang...@envisioncn.com>:

> Thank for your quick response.
> These jars are used to define some customize business logic, and they can
> be treat as plug-ins in our business scenario. And the jars are
> developed/maintain by some third-party partners, this means there will be
> some version updating.
> My expectation is update the business code with restarting the spark
> streaming job, any suggestion will be very grateful.
>
> Regards
> Jonnas Li
>
> 发件人: Jörn Franke <jornfra...@gmail.com>
> 日期: 2017年6月6日 星期二 下午5:55
> 至: Zhongshuang Li <zhongshuang...@envisioncn.com>
> 抄送: "user@spark.apache.org" <user@spark.apache.org>
> 主题: Re: Java SPI jar reload in Spark
>
> Why do you need jar reloading? What functionality is executed during jar
> reloading. Maybe there is another way to achieve the same without jar
> reloading. In fact, it might be dangerous from a functional point of view-
> functionality in jar changed and all your computation is wrong.
>
> On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) <
> zhongshuang...@envisioncn.com> wrote:
>
> I have a Spark Streaming application, which dynamically calling a jar
> (Java SPI), and the jar is called in a mapWithState() function, it was
> working fine for a long time.
> Recently, I got a requirement which required to reload the jar during
> runtime.
> But when the reloading is completed, the spark streaming job got failed,
> and I get the following exception, it seems the spark try to deserialize
> the checkpoint failed.
> My question is whether the logic in the jar will be serialized into
> checkpoint, and is it possible to do the jar reloading during runtime in
> Spark Streaming?
>
>
> [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, 
> ip-10-21-14-205.envisioncn.com): java.lang.ClassCastException: cannot assign 
> instance of scala.collection.immutable.List$SerializationProxy to field 
> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
> scala.collection.Seq in instance of 
> org.apache.spark.streaming.rdd.MapWithStateRDD
>   at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
>   at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>   at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStre

Re: Java SPI jar reload in Spark

2017-06-06 Thread Jonnas Li(Contractor)
Thank for your quick response.
These jars are used to define some customize business logic, and they can be 
treat as plug-ins in our business scenario. And the jars are developed/maintain 
by some third-party partners, this means there will be some version updating.
My expectation is update the business code with restarting the spark streaming 
job, any suggestion will be very grateful.

Regards
Jonnas Li

发件人: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>
日期: 2017年6月6日 星期二 下午5:55
至: Zhongshuang Li 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>
抄送: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

Why do you need jar reloading? What functionality is executed during jar 
reloading. Maybe there is another way to achieve the same without jar 
reloading. In fact, it might be dangerous from a functional point of view- 
functionality in jar changed and all your computation is wrong.

On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> wrote:

I have a Spark Streaming application, which dynamically calling a jar (Java 
SPI), and the jar is called in a mapWithState() function, it was working fine 
for a long time.
Recently, I got a requirement which required to reload the jar during runtime.
But when the reloading is completed, the spark streaming job got failed, and I 
get the following exception, it seems the spark try to deserialize the 
checkpoint failed.
My question is whether the logic in the jar will be serialized into checkpoint, 
and is it possible to do the jar reloading during runtime in Spark Streaming?



[2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, 
ip-10-21-14-205.envisioncn.com<http://ip-10-21-14-205.envisioncn.com>): 
java.lang.ClassCastException: cannot assign instance of 
scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
scala.collection.Seq in instance of 
org.apache.spark.streaming.rdd.MapWithStateRDD
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
a

Re: Java SPI jar reload in Spark

2017-06-06 Thread Jörn Franke
Why do you need jar reloading? What functionality is executed during jar 
reloading. Maybe there is another way to achieve the same without jar 
reloading. In fact, it might be dangerous from a functional point of view- 
functionality in jar changed and all your computation is wrong.

> On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) 
>  wrote:
> 
> I have a Spark Streaming application, which dynamically calling a jar (Java 
> SPI), and the jar is called in a mapWithState() function, it was working fine 
> for a long time.
> Recently, I got a requirement which required to reload the jar during runtime.
> But when the reloading is completed, the spark streaming job got failed, and 
> I get the following exception, it seems the spark try to deserialize the 
> checkpoint failed.
> My question is whether the logic in the jar will be serialized into 
> checkpoint, and is it possible to do the jar reloading during runtime in 
> Spark Streaming?
> 
> 
> [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, 
> ip-10-21-14-205.envisioncn.com): java.lang.ClassCastException: cannot assign 
> instance of scala.collection.immutable.List$SerializationProxy to field 
> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type 
> scala.collection.Seq in instance of 
> org.apache.spark.streaming.rdd.MapWithStateRDD
>   at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
>   at 
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>   at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
>   at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at