Hi!

Thanks a lot for reporting this.
It turns out that this is a nasty bug: 
https://issues.apache.org/jira/browse/FLINK-7041.

Aljoscha is working on fixing it already. It’s definitely a critical bug, so 
we’ll try to include in the next bugfix release.

Cheers,
Gordon
On 29 June 2017 at 7:05:09 PM, 周思华 (summerle...@163.com) wrote:


I will keep the call to special my rocksdb option later, OptionFactory have 
already extended the java.io.Serializable interface and MRocksDBFactory  
implement from OptionFactory , so MRocksDBFactory should have the 
Serializability. Why this problem occur? 

At 2017-06-29 17:53:07, "Ted Yu" <yuzhih...@gmail.com> wrote:
Since MRocksDBFactory doesn't add any option, it seems 
rocksDBBackEnd.setOptions() call can be skipped.

If you choose to keep the call, please take a look at (OptionsFactory extends 
java.io.Serializable):

https://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html

On Thu, Jun 29, 2017 at 2:16 AM, 周思华 <summerle...@163.com> wrote:
I use the follow code to set RocksDBStateBackend and it option, it can run 
correctly locally, but can't be submitted to cluster.

Main.class:
public static void main() {
       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        RocksDBStateBackend rocksDBBackEnd = new 
RocksDBStateBackend("file:///Users/zsh/tmp/rocksdb");
        rocksDBBackEnd.setPredefinedOptions(PredefinedOptions.DEFAULT);
        rocksDBBackEnd.setOptions(new MRocksDBFactory());
        env.setStateBackend(rocksDBBackEnd);
        ...............        
        env.execute(jobName);
}

MRocksDBFactory.class:
public class MRocksDBFactory implements OptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {

return currentOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {

return currentOptions;

}
}

The exception info in jobmanager.log look like below:

2017-06-29 16:29:27,162 WARN  akka.remote.ReliableDeliverySupervisor            
            - Association with remote system 
[akka.tcp://flink@10.242.98.255:52638] has failed, address is now gated for 
[5000] ms. Reason: [gerryzhou.MRocksDBFactory]
2017-06-29 16:29:27,163 ERROR Remoting                                          
            - gerryzhou.MRocksDBFactory
java.lang.ClassNotFoundException: gerryzhou.MRocksDBFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
at 
akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
at 
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:192)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:967)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Can anybody help?


 



 




 

Reply via email to