Hi David, I cannot exactly tell how you ended up seeing an OptionalDataException without seeing your code.
Flink supports to run multiple jobs on the same cluster. That’s what we call the session mode. You should not reuse the ExecutionEnvironment because then, you will create a single job which simply consists of multiple disjunct parts. Calling ExecutionEnvironment.getExecutionEnvironment will give you a fresh ExecutionEnvrionment which you can use to submit a new job. Note that you have to call env.execute in a separate thread because it is a blocking operation. Cheers, Till On Thu, Oct 26, 2017 at 10:22 PM, David Dreyfus <dddrey...@gmail.com> wrote: > Hello, > > I am trying to submit multiple jobs to flink from within my Java program. > I am running into an exception that may be related: > java.io.OptionalDataException. > > Should I be able to create multiple plans/jobs within a single session and > execute them concurrently? > If so, is there a working example you can point me at? > > Do I share the same ExecutionEnvironment? > It looks like calls to getExecutionEnvironment() return the same one. > > I have a number of different transformations on my data I'd like to make. > I'd rather not create one very large job and have them processed in > parallel. > My cluster has enough resources that performing each job sequentially would > be very wasteful. > > Thank you, > David > > Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15) > java.io.OptionalDataException > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1588) > at java.io.ObjectInputStream.readObject(ObjectInputStream. > java:428) > at java.util.HashMap.readObject(HashMap.java:1407) > at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject( > ObjectStreamClass.java:1158) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2173) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1568) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2282) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1568) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2282) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream. > java:428) > at > org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:290) > at > org.apache.flink.util.SerializedValue.deserializeValue( > SerializedValue.java:58) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$ > jobmanager$JobManager$$submitJob(JobManager.scala:1283) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1. > applyOrElse(JobManager.scala:495) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala: > 36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$ > anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala: > 36) > at org.apache.flink.runtime.LogMessages$$anon$1.apply( > LogMessages.scala:33) > at org.apache.flink.runtime.LogMessages$$anon$1.apply( > LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction. > scala:123) > at > org.apache.flink.runtime.LogMessages$$anon$1. > applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager. > scala:125) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >