[
https://issues.apache.org/jira/browse/S4-131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13615190#comment-13615190
]
Dingyu Yang commented on S4-131:
---------------------------------
I found the problem. If the checkpoint PE has Streamable<Event> downStream, it
should be set transient.
downStream.put(event) will get error without transient when storing the
checkpoint.
Another is that if PE has no-arg constructor(my constructor is at onCreate()),
the recover cannot success as follows:
19:21:15.579 [resultStream] ERROR org.apache.s4.core.ProcessingElement - Cannot
restore state for key [[PROTO_ID];[KEY] --> [example.WordCount];[results]]:
Class cannot be created (missing no-arg constructor): example.WordCount
com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
no-arg constructor): example.WordCount
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1048)
~[kryo-2.20.jar:na]
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1060)
~[kryo-2.20.jar:na]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
~[kryo-2.20.jar:na]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
~[kryo-2.20.jar:na]
Modify the code using objeness at s4-comm:
KryoSerDeser.java
add code at the initialValue :
protected Kryo initialValue() {
Kryo kryo = new Kryo();
kryo.setClassLoader(classLoader);
kryo.setRegistrationRequired(false);
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
return kryo;
> checkpoint error "CancellationException"
> -----------------------------------------
>
> Key: S4-131
> URL: https://issues.apache.org/jira/browse/S4-131
> Project: Apache S4
> Issue Type: Bug
> Affects Versions: 0.6
> Environment: The configure like this:
> App program:
> ...
> wordSumPE.setCheckpointingConfig(new
> CheckpointingConfig.Builder(CheckpointingMode.TIME).frequency(20).timeUnit(TimeUnit.SECONDS).build());
> ...
> delpoy:
> ./s4 deploy -a=example.wordcountApp -c=testCluster1 -appName=wordApp
> -p=s4.checkpointing.filesystem.storageRootPath=/home/tmp/s4checkpoint
> -emc=org.apache.s4.core.ft.FileSystemBackendCheckpointingModule
> Reporter: Dingyu Yang
> Priority: Minor
> Fix For: 0.7
>
>
> Then I get this error:
> 14:21:50.251 [Checkpointing-storage-0] WARN
> org.apache.s4.core.ft.SaveStateTask - Cannot save checkpoint :
> [PROTO_ID];[KEY] --> [example.WordSumPE];[word]
> java.util.concurrent.ExecutionException:
> com.esotericsoftware.kryo.KryoException:
> java.util.ConcurrentModificationException
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> contextClassLoader (java.lang.Thread)
> thread (java.util.concurrent.ThreadPoolExecutor$Worker)
> workers (java.util.concurrent.ThreadPoolExecutor)
> fetchingThreadPool (org.apache.s4.core.ft.SafeKeeper)
> checkpointingFramework (example.wordcountApp)
> app (org.apache.s4.core.Stream)
> downStream (example.WordSumPE)
> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:232)
> ~[na:1.6.0_22]
> at java.util.concurrent.FutureTask.get(FutureTask.java:91) ~[na:1.6.0_22]
> at org.apache.s4.core.ft.SaveStateTask.run(SaveStateTask.java:66)
> ~[bin/:na]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> [na:1.6.0_22]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> [na:1.6.0_22]
> at java.lang.Thread.run(Thread.java:662) [na:1.6.0_22]
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.util.ConcurrentModificationException
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> contextClassLoader (java.lang.Thread)
> thread (java.util.concurrent.ThreadPoolExecutor$Worker)
> workers (java.util.concurrent.ThreadPoolExecutor)
> fetchingThreadPool (org.apache.s4.core.ft.SafeKeeper)
> checkpointingFramework (example.wordcountApp)
> app (org.apache.s4.core.Stream)
> downStream (example.WordSumPE)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> ~[kryo-2.20.jar:na]
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:504)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> ~[kryo-2.20.jar:na]
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:504)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> ~[kryo-2.20.jar:na]
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:552)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:68)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
> ~[kryo-2.20.jar:na]
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:504)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> ~[kryo-2.20.jar:na]
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:504)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> ~[kryo-2.20.jar:na]
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:504)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> ~[kryo-2.20.jar:na]
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:504)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> ~[kryo-2.20.jar:na]
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:504)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> ~[kryo-2.20.jar:na]
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:571)
> ~[kryo-2.20.jar:na]
> at
> org.apache.s4.comm.serialize.KryoSerDeser.serialize(KryoSerDeser.java:91)
> ~[bin/:na]
> at
> org.apache.s4.core.ProcessingElement.serializeState(ProcessingElement.java:802)
> ~[bin/:na]
> at org.apache.s4.core.ft.SerializeTask.call(SerializeTask.java:42)
> ~[bin/:na]
> at org.apache.s4.core.ft.SerializeTask.call(SerializeTask.java:1)
> ~[bin/:na]
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> ~[na:1.6.0_22]
> at java.util.concurrent.FutureTask.run(FutureTask.java:138) ~[na:1.6.0_22]
> ... 3 common frames omitted
> Caused by: java.util.ConcurrentModificationException: null
> at
> java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372)
> ~[na:1.6.0_22]
> at java.util.AbstractList$Itr.next(AbstractList.java:343) ~[na:1.6.0_22]
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
> ~[kryo-2.20.jar:na]
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:504)
> ~[kryo-2.20.jar:na]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> ~[kryo-2.20.jar:na]
> ... 35 common frames omitted
> I debug the program and at the position : SaveStateTask.run
> "futureSerializedState.get(1000, TimeUnit.MILLISECONDS)." The futureTask get
> "CancellationException" displayed above error.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira