Hi Philipp, what I usually do to run a Flink program on a cluster from within my IDE, I create a RemoteExecutionEnvironment. Since I have one UDF (the map function which doubles the values) defined, I also need to specify the jar containing this class. In my case, the jar is called test-1.0-SNAPSHOT.jar.
def main(args: Array[String]) { // set up the execution environment val env = ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "target/test-1.0-SNAPSHOT.jar") val elements = env.fromElements(1,2,3,4,5) val doubled = elements.map(x => 2*x) doubled.printOnTaskManager("TaskManager") // execute program env.execute("Flink Scala API Skeleton") } But I also tried your approach of how to submit jobs to Flink and it worked for me as well. Therefore, I guess that there is something wrong with your job. What happens in PigStorage().load? Cheers, Till On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze < philipp.goe...@tu-ilmenau.de> wrote: > Hey Tim, > > I think my previous mail was intercepted or something similar. However you > can find my reply below. I already tried a simpler job which just does a > env.fromElements... but still the same stack. > > How do you normally submit jobs (jars) from within the code? > > Best Regards, > Philipp > > > -------- Forwarded Message -------- Subject: Re: Submitting jobs from > within Scala code Date: Thu, 16 Jul 2015 14:31:01 +0200 From: Philipp > Goetze <philipp.goe...@tu-ilmenau.de> <philipp.goe...@tu-ilmenau.de> To: > user@flink.apache.org > > Hey, > > from the JobManager I do not get any more hints: > > 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist > - Received message RequestJobCounts at akka://flink/user/archive from > Actor[akka://flink/temp/$gc]. > 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist > - Handled message RequestJobCounts in 0 ms from > Actor[akka://flink/temp/$gc]. > 13:36:06,674 DEBUG org.eclipse.jetty.util.log > - RESPONSE /jobsInfo 200 > 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager > - Received message RequestBlobManagerPort at akka://flink/user/jobmanager > from Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b]. > 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager > - Handled message RequestBlobManagerPort in 0 ms from > Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b]. > 13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection > - Received PUT request for content addressable BLOB > 13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager > - Received message > SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at > akka://flink/user/jobmanager from > Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443]. > 13:36:07,087 INFO org.apache.flink.runtime.jobmanager.JobManager > - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query). > 13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager > - Running initialization on master for job > 146bc2162de9353bbd457a74eda59ae3 (Starting Query). > 13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager > - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query) > org.apache.flink.runtime.client.JobSubmissionException: The vertex null > (null) has no invokable class. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) > at scala.collection.Iterator$class.foreach(Iterator.scala:743) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1195) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph > - Starting Query switched from CREATED to FAILING. > 13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph > - Starting Query switched from FAILING to FAILED. > 13:36:07,088 DEBUG org.apache.flink.runtime.jobmanager.JobManager > - Handled message > SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) in 1 ms > from Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443]. > 13:36:07,524 DEBUG Remoting > - Remote system with address [akka.tcp://flink@127.0.0.1:43640] has shut > down. Address is now gated for 5000 ms, all messages to this address will be > delivered to dead letters. > > > > > The code of the job is quite simple (just a test-case). As stated before > it works when using the wrapper script and the web client. I think > something is wrong in the submitJar method I posted earlier. But here the > code of the submitted job: > > import org.apache.flink.api.scala._ > > import dbis.flink._ > > object load { > > def tupleAToString(t: List[Any]): String = { > > implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]] > > val sb = new StringBuilder > > sb.append(t(0)) > > sb.toString > > } > > def main(args: Array[String]) { > > val env = ExecutionEnvironment.getExecutionEnvironment > > val A = PigStorage().load(env, > "/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt", > '\t') > > A.map(t => > tupleAToString(t)).writeAsText("/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/result1.out") > > env.execute("Starting Query") > > } > > } > > > Best Regards, > Philipp >