Hi Philipp,

what I usually do to run a Flink program on a cluster from within my IDE, I
create a RemoteExecutionEnvironment. Since I have one UDF (the map function
which doubles the values) defined, I also need to specify the jar
containing this class. In my case, the jar is called test-1.0-SNAPSHOT.jar.

def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.createRemoteEnvironment("localhost",
6123, "target/test-1.0-SNAPSHOT.jar")

    val elements = env.fromElements(1,2,3,4,5)

    val doubled = elements.map(x => 2*x)

    doubled.printOnTaskManager("TaskManager")

    // execute program
    env.execute("Flink Scala API Skeleton")
  }

But I also tried your approach of how to submit jobs to Flink and it worked
for me as well. Therefore, I guess that there is something wrong with your
job. What happens in PigStorage().load?

Cheers,
Till
​

On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze <
philipp.goe...@tu-ilmenau.de> wrote:

>  Hey Tim,
>
> I think my previous mail was intercepted or something similar. However you
> can find my reply below. I already tried a simpler job which just does a
> env.fromElements... but still the same stack.
>
> How do you normally submit jobs (jars) from within the code?
>
> Best Regards,
> Philipp
>
>
> -------- Forwarded Message --------  Subject: Re: Submitting jobs from
> within Scala code  Date: Thu, 16 Jul 2015 14:31:01 +0200  From: Philipp
> Goetze <philipp.goe...@tu-ilmenau.de> <philipp.goe...@tu-ilmenau.de>  To:
> user@flink.apache.org
>
> Hey,
>
> from the JobManager I do not get any more hints:
>
> 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist        
>    - Received message RequestJobCounts at akka://flink/user/archive from 
> Actor[akka://flink/temp/$gc].
> 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist        
>    - Handled message RequestJobCounts in 0 ms from 
> Actor[akka://flink/temp/$gc].
> 13:36:06,674 DEBUG org.eclipse.jetty.util.log                                 
>    - RESPONSE /jobsInfo  200
> 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager             
>    - Received message RequestBlobManagerPort at akka://flink/user/jobmanager 
> from Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b].
> 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager             
>    - Handled message RequestBlobManagerPort in 0 ms from 
> Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b].
> 13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection         
>    - Received PUT request for content addressable BLOB
> 13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager             
>    - Received message 
> SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at 
> akka://flink/user/jobmanager from 
> Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443].
> 13:36:07,087 INFO  org.apache.flink.runtime.jobmanager.JobManager             
>    - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
> 13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager             
>    - Running initialization on master for job 
> 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
> 13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager             
>    - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query)
> org.apache.flink.runtime.client.JobSubmissionException: The vertex null 
> (null) has no invokable class.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:743)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1195)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>       at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>       at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>       at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>       at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Starting Query switched from CREATED to FAILING.
> 13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Starting Query switched from FAILING to FAILED.
> 13:36:07,088 DEBUG org.apache.flink.runtime.jobmanager.JobManager             
>    - Handled message 
> SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) in 1 ms 
> from Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443].
> 13:36:07,524 DEBUG Remoting                                                   
>    - Remote system with address [akka.tcp://flink@127.0.0.1:43640] has shut 
> down. Address is now gated for 5000 ms, all messages to this address will be 
> delivered to dead letters.
>
>
>
>
> The code of the job is quite simple (just a test-case). As stated before
> it works when using the wrapper script and the web client. I think
> something is wrong in the submitJar method I posted earlier. But here the
> code of the submitted job:
>
> import org.apache.flink.api.scala._
>
> import dbis.flink._
>
> object load {
>
> def tupleAToString(t: List[Any]): String = {
>
>   implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
>
>   val sb = new StringBuilder
>
>   sb.append(t(0))
>
>   sb.toString
>
> }
>
>     def main(args: Array[String]) {
>
>         val env = ExecutionEnvironment.getExecutionEnvironment
>
>         val A = PigStorage().load(env, 
> "/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt",
>  '\t')
>
>         A.map(t => 
> tupleAToString(t)).writeAsText("/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/result1.out")
>
>         env.execute("Starting Query")
>
>     }
>
> }
>
>
> Best Regards,
> Philipp
>

Reply via email to