I used 
java.util.ServiceLoader<S><https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html>
  , as the javadoc says it supports reloading.
Please point it out if I'm mis-understanding.

Regards
Jonnas Li

发件人: Alonso Isidoro Roman <alons...@gmail.com<mailto:alons...@gmail.com>>
日期: 2017年6月6日 星期二 下午6:21
至: Zhongshuang Li 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>
抄送: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

Hi, a quick search on google.

https://github.com/spark-jobserver/spark-jobserver/issues/130

<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
Alonso Isidoro Roman
about.me/alonso.isidoro.roman


2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>:
Thank for your quick response.
These jars are used to define some customize business logic, and they can be 
treat as plug-ins in our business scenario. And the jars are developed/maintain 
by some third-party partners, this means there will be some version updating.
My expectation is update the business code with restarting the spark streaming 
job, any suggestion will be very grateful.

Regards
Jonnas Li

发件人: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>
日期: 2017年6月6日 星期二 下午5:55
至: Zhongshuang Li 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>
抄送: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
主题: Re: Java SPI jar reload in Spark

Why do you need jar reloading? What functionality is executed during jar 
reloading. Maybe there is another way to achieve the same without jar 
reloading. In fact, it might be dangerous from a functional point of view- 
functionality in jar changed and all your computation is wrong.

On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) 
<zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> wrote:

I have a Spark Streaming application, which dynamically calling a jar (Java 
SPI), and the jar is called in a mapWithState() function, it was working fine 
for a long time.
Recently, I got a requirement which required to reload the jar during runtime.
But when the reloading is completed, the spark streaming job got failed, and I 
get the following exception, it seems the spark try to deserialize the 
checkpoint failed.
My question is whether the logic in the jar will be serialized into checkpoint, 
and is it possible to do the jar reloading during runtime in Spark Streaming?



[2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, 
ip-10-21-14-205.envisioncn.com<http://ip-10-21-14-205.envisioncn.com>): 
java.lang.ClassCastException: cannot assign instance of 
scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.rdd.RDD.org<http://org.apache.spark.rdd.RDD.org>$apache$spark$rdd$RDD$$dependencies_
 of type scala.collection.Seq in instance of 
org.apache.spark.streaming.rdd.MapWithStateRDD
        at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
        at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
        at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
 (org.apache.spark.scheduler.TaskSetManager)
[2017-06-06 17:13:12,298] ERROR Task 1 in stage 5355.0 failed 8 times; aborting 
job (org.apache.spark.scheduler.TaskSetManager)
[2017-06-06 17:13:12,314] ERROR Error running job streaming job 1496740392000 
ms.0 (org.apache.spark.streaming.scheduler.JobScheduler)

Thank you in advanced!

Regards
Jonnas Li



本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be 
legally privileged. If you have received it by mistake, please notify the 
sender by return email and delete this message from your system. Any 
unauthorized use or dissemination of this message in whole or in part is 
strictly prohibited. Envision Energy Limited and all its subsidiaries shall not 
be liable for the improper or incomplete transmission of the information 
contained in this email nor for any delay in its receipt or damage to your 
system. Envision Energy Limited does not guarantee the integrity of this email 
message, nor that this email message is free of viruses, interceptions, or 
interference.



本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be 
legally privileged. If you have received it by mistake, please notify the 
sender by return email and delete this message from your system. Any 
unauthorized use or dissemination of this message in whole or in part is 
strictly prohibited. Envision Energy Limited and all its subsidiaries shall not 
be liable for the improper or incomplete transmission of the information 
contained in this email nor for any delay in its receipt or damage to your 
system. Envision Energy Limited does not guarantee the integrity of this email 
message, nor that this email message is free of viruses, interceptions, or 
interference.




本邮件(包括任何附件)内容机密并受法律保护。如果您意外地收到这封邮件,请回复通知发件人并从当前系统中删除本邮件。任何未经授权者,严禁使用并部分或者全部的转发本条信息。任何未经授权的使用或传播都是被严格禁止的。远景能源与其分公司不对因不正确和不完整的转发此邮件包含的信息以及因任何因邮件延迟或对你的系统造成的损害而负责。远景能源不能保证此邮件的真实完整性,也不能确定此邮件是否含有病毒或者监听程序。
This email message (including any attachments) is confidential and may be 
legally privileged. If you have received it by mistake, please notify the 
sender by return email and delete this message from your system. Any 
unauthorized use or dissemination of this message in whole or in part is 
strictly prohibited. Envision Energy Limited and all its subsidiaries shall not 
be liable for the improper or incomplete transmission of the information 
contained in this email nor for any delay in its receipt or damage to your 
system. Envision Energy Limited does not guarantee the integrity of this email 
message, nor that this email message is free of viruses, interceptions, or 
interference.

Reply via email to