Re: Gelly EOFException
For now, there is a validator that checks whether the vertex ids correspond to the target/src ids in the edges. If you want to check for vertex ID uniqueness, you'll have to implement your own custom validator... I know people with the same error outside Gelly, so I doubt that the lack of unique ids triggered the exception :) On Thu, Jul 16, 2015 at 6:14 PM, Flavio Pompermaier pomperma...@okkam.it wrote: I thought a bit about this error..in my job I was generating multiple vertices with the same id. Could this cause such errors? Maybe there could be a check about such situations in Gelly.. On Tue, Jul 14, 2015 at 10:00 PM, Andra Lungu lungu.an...@gmail.com wrote: Hello, Sorry for the delay. The bug is not in Gelly, but is, as hinted in the exception and as can be seen in the logs, in Flink's Runtime. Mihail may actually be on to something. The bug is actually very similar to the one described in FLINK-1916. However, as can be seen in the discussion thread there, it's a bit difficult to fix it without some steps to reproduce. I unfortunately managed to reproduce it and have opened a Jira... FLINK-2360 https://issues.apache.org/jira/browse/FLINK-2360. It's a similar delta iteration setting. Hope we can get some help with this. Thanks! Andra On Tue, Jul 14, 2015 at 2:12 PM, Mihail Vieru vi...@informatik.hu-berlin.de wrote: Hi, looks very similar to this bug: https://issues.apache.org/jira/browse/FLINK-1916 Best, Mihail On 14.07.2015 14:09, Andra Lungu wrote: Hi Flavio, Could you also show us a code snippet? On Tue, Jul 14, 2015 at 2:06 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi to all, in my vertex centric iteration I get the following exception, am I doing something wrong or is it a bug of Gelly? starting iteration [1]: CoGroup (Messaging) (6/8) IterationHead(WorksetIteration (Vertex-centric iteration (test.gelly.functions.VUpdateFunction@1814786f | test.gelly.functions.VMessagingFunction@67eecbc2))) (4/8) switched to FAILED with exception. java.io.EOFException at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201) at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39) at com.esotericsoftware.kryo.io.Output.flush(Output.java:163) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:187) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:372) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Best, Flavio
Re: Flink Kafka example in Scala
Thanks! I tried your updated MySimpleStringSchema and it works for both source and sink. However, my problem is the runtime error Data stream sinks cannot be copied as listed in previous post. I hope someone ran into the problem before and can give a hint. Wendong -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2109.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Flink Kafka example in Scala
Hey, The reason you are getting that error is because you are calling print after adding a sink, which is an invalid operation. Remove either addSink or print :) Cheers, Gyula On Thu, Jul 16, 2015 at 7:37 PM Wendong wendong@gmail.com wrote: Thanks! I tried your updated MySimpleStringSchema and it works for both source and sink. However, my problem is the runtime error Data stream sinks cannot be copied as listed in previous post. I hope someone ran into the problem before and can give a hint. Wendong -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2109.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Sort Benchmark infrastructure
Hi George Thanks for the details. It looks like I have a long way to go. For big data benchmark, I would like to use that test cases, test data and test methodology to test different big data technologies. BTW, I am agree with you that no one system will necessarily be optimal for all cases for all workloads. I hope I can find a good one for our enterprise application. I will let you know if I can move forward this. Good Night. Best regards Hawin On Wed, Jul 15, 2015 at 9:30 AM, George Porter gmpor...@cs.ucsd.edu wrote: Hi Hawin, We used varying numbers of the i2.8xlarge servers, depending on the sort record category. http://sortbenchmark.org/ is really your best source for what we did--all the details (should) be on our write-ups. Note that we pro-rated the cost, meaning that if we ran for 15 minutes, we took the hourly rate and divided by 4. In terms of sponsorship, we used a combination of credits donated by Amazon, as well as funding form the National Science Foundation. You can submit a grant proposal to Amazon and ask them for credits if you're an academic or researcher. Not sure if being part of an open-source project counts, but you might as well try. In terms of the sort record, that webpage I provided above has all the details on the challenge. Not sure about Big Data benchmark--that term is pretty vague. Often when people say big data, they mean different things. Our system is designed for lots of bytes, but not really lots of compute over those bytes. Others pick different design points. I think you'll find that the needs of different users varies quite a bit, and no one system will necessarily be optimal for all cases for all workloads. Good luck on your attempts. -George George Porter Assistant Professor, Dept. of Computer Science and Engineering Associate Director, UCSD Center for Networked Systems UC San Diego, La Jolla CA http://www.cs.ucsd.edu/~gmporter/ On Wed, Jul 15, 2015 at 1:44 AM, Hawin Jiang hawin.ji...@gmail.com wrote: Hi George and Mike Thanks for your information. Did you use 186 i2.8xlarge servers for testing? Total one hour cost = 186 * 6.82 = $1,268.52. Do you know any person or company can sponsor this? For our test approach, I have checked an industry standard from big data bench(http://prof.ict.ac.cn/BigDataBench/industry-standard-benchmarks/) Maybe we can test TeraSort to see the performance is better than your record or not. Please let me know if you have any comments. Thanks for the support. Best regards Hawin On Tue, Jul 14, 2015 at 9:42 AM, Mike Conley mcon...@cs.ucsd.edu wrote: George is correct. We used i2.8xlarge with placement groups on Amazon EC2. We ran Amazon Linux, which if I recall correctly is based on Red Hat, but optimized for EC2. OS was essentially unmodified with some packages installed for our dependencies. Thanks, Mike On Tue, Jul 14, 2015 at 9:15 AM, George Porter gmpor...@cs.ucsd.edu wrote: Hello Hawin, Thanks for reaching out. We wrote a paper on our efforts, which we'll be posting to our website in a couple of weeks. However in summary, we used a cluster of i2.8xlarge instance types from Amazon, and we made use of the placement group feature to ensure that we'd get good bandwidth between them. Mike can correct me if I'm wrong, but I believe we used the stock AWS version of Linux (Ubuntu maybe?) So our environment was pretty stock--we didn't get any special support or features from AWS. Best of luck with your profiling and benchmarking. Do let us know how you perform. Flink looks like a pretty interesting project, and so let us know if we can help y'all out in some way. Thanks, George On Sun, Jul 12, 2015 at 11:12 PM, Hawin Jiang hawin.ji...@gmail.com wrote: Hi Michael and George First of all, congratulation you guys have won the sort game again. We are coming from Flink community. I am not sure if it is possible to get your test environment to test our Flink for free. we saw that Apache spark did a good job as well. We want to challenge your records. But we don’t have that much servers for testing. Please let me know if you can help us or not. Thank you very much. Best regards Hawin
Re: Flink Scala performance
I ran it on local, from terminal. And it's the Word Count example so it's small -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2074.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Flink Scala performance
Hi Max, When I call 'flink run', it doesn't show any information like that -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2083.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Flink Scala performance
I found it in JobManager log 21:16:54,986 INFO org.apache.flink.runtime.taskmanager.TaskManager - Using 25 MB for Flink managed memory. is there a way to explicitly assign this for local ? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2087.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Flink Scala performance
You can increase Flink managed memory by increasing Taskmanager JVM Heap (taskmanager.heap.mb) in flink-conf.yaml. There is some explanation of options in Flink documentation [1]. Regards, Chiwan Park [1] https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#common-options On Jul 16, 2015, at 7:23 PM, Vinh June hoangthevinh@gmail.com wrote: I found it in JobManager log 21:16:54,986 INFO org.apache.flink.runtime.taskmanager.TaskManager - Using 25 MB for Flink managed memory. is there a way to explicitly assign this for local ? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2087.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Submitting jobs from within Scala code
Hi Philipp, could you post the complete log output. This might help to get to the bottom of the problem. Cheers, Till On Thu, Jul 16, 2015 at 11:01 AM, Philipp Goetze philipp.goe...@tu-ilmenau.de wrote: Hi community, in our project we try to submit built Flink programs to the jobmanager from within Scala code. The test program is executed correctly when submitted via the wrapper script bin/flink run ... and also with the webclient. But when executed from within the Scala code nothing seems to happen, but the following warning is found in the log: 10:47:18,153 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@127.0.0.1:34074] has failed, address is now gated for [5000] ms. Reason is: [org.apache.flink.runtime.jobgraph.AbstractJobVertex] Our submit method looks like that: def submitJar(master: String, path: String, className: String, args: String*) = { val file = new File(path) val parallelism = 1 val wait = true try { val program = new PackagedProgram(file, className, args:_*) val jobManagerAddress = getInetFromHostport(master) val client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader(), 1) println(Executing + path); client.run(program, parallelism, wait); } catch { case e: ProgramInvocationException = e.printStackTrace() } } I took this as a reference: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-submit-flink-jars-from-plain-Java-programs-td656.html I hope you can help. Best Regards, Philipp Götze
Re: Flink Scala performance
Vinh, Are you using the sample data built into the example, or are you using your own data? On Thu, Jul 16, 2015 at 8:54 AM, Vinh June hoangthevinh@gmail.com wrote: I ran it on local, from terminal. And it's the Word Count example so it's small -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2074.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Flink Scala performance
Hey Vinh, you have to look into the logs folder and find the log of the TaskManager (something like *taskmanager*.log) – Ufuk On 16 Jul 2015, at 11:35, Vinh June hoangthevinh@gmail.com wrote: Hi Max, When I call 'flink run', it doesn't show any information like that -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2083.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Flink Scala performance
HI Vinh, If you run your program locally, then Flink uses the local execution mode which allocates only little managed memory. Managed memory is used by Flink to perform operations on serialized data. These operations can get slow if too little memory gets allocated because data needs to be spilled to disk. That would of course be different in a cluster environment where you configure the memory explicitly. When the task manager starts, it tells you how much memory it allocates. For example, in my case: 10:12:37,655 INFO org.apache.flink.runtime.taskmanager.TaskManager - Using 1227 MB for Flink managed memory. How does that look in your case? Cheers, Max On Thu, Jul 16, 2015 at 8:54 AM, Vinh June hoangthevinh@gmail.com wrote: I ran it on local, from terminal. And it's the Word Count example so it's small -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2074.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Flink Kafka example in Scala
Hi, your first example doesn't work because the SimpleStringSchema does not work for sinks. You can use this modified serialization schema: https://gist.github.com/aljoscha/e131fa8581f093915582. This works for both source and sink (I think the current SimpleStringSchema is not correct and should be changed in the next release.) Cheers, Aljoscha On Thu, 16 Jul 2015 at 08:37 Anwar Rizal anriza...@gmail.com wrote: The compilation error is because you don't define dependencies to flink streaming scala. In SBT , you define something like: libraryDependencies += org.apache.flink % flink-streaming-scala % 0.9.0 On Thu, Jul 16, 2015 at 6:36 AM, Wendong wendong@gmail.com wrote: I tried, but got error: [error] TestKafka.scala:11: object scala is not a member of package org.apache.flink.streaming.api [error] import org.apache.flink.streaming.api.scala._ So I switched back to my original import statements. Now I changed SimpleStringSchema to JavaDefaultStringSchema in addSink(new KafkaSink(...)), then compilation error was gone. The problem is that there is runtime error: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) at org.apache.flink.client.program.Client.run(Client.java:315) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922) Caused by: java.lang.RuntimeException:* Data stream sinks cannot be copied* at org.apache.flink.streaming.api.datastream.DataStreamSink.copy(DataStreamSink.java:43) at org.apache.flink.streaming.api.datastream.DataStreamSink.copy(DataStreamSink.java:30) at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1341) at org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:1029) ... Googled the error message but didn't find useful information. Anyone can shed some light? Thanks! Wendong -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2071.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Submitting jobs from within Scala code
Hi community, in our project we try to submit built Flink programs to the jobmanager from within Scala code. The test program is executed correctly when submitted via the wrapper script bin/flink run ... and also with the webclient. But when executed from within the Scala code nothing seems to happen, but the following warning is found in the log: 10:47:18,153 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@127.0.0.1:34074] has failed, address is now gated for [5000] ms. Reason is: [org.apache.flink.runtime.jobgraph.AbstractJobVertex] Our submit method looks like that: def submitJar(master: String, path: String, className: String, args: String*) = { val file = new File(path) val parallelism = 1 val wait = true try { val program = new PackagedProgram(file, className, args:_*) val jobManagerAddress = getInetFromHostport(master) val client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader(), 1) println(Executing + path); client.run(program, parallelism, wait); } catch { case e: ProgramInvocationException = e.printStackTrace() } } I took this as a reference: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-submit-flink-jars-from-plain-Java-programs-td656.html I hope you can help. Best Regards, Philipp Götze
Containment Join Support
Hi everyone, at first, thanks for building this great framework! We are using Flink and especially Gelly for building a graph analytics stack (gradoop.com). I was wondering if there is a [planned] support for a containment join operator. Consider the following example: DataSetListInt left := {[0, 1], [2, 3, 4], [5]} DataSetTuple2Int, Int right := {0, 1, 1, 0, 2, 1, 5, 2} What I want to compute is left.join(right).where(list).contains(tuple.f0) := { [0, 1], 0,1, [0, 1], 1, 0, [2, 3, 4], 2, 1, [5], 5, 2 } At the moment, I am solving that using cross and filter, which can be expensive. The generalization of that operator would be set containment join, where you join if the right set is contained in the left set. If there is a general need for that operator, I would also like to contribute to its implementation. But maybe, there is already another nice solution which I didn't discover yet? Any help would be appreciated. Especially since I would also like to contribute some of our graph operators (e.g., graph summarization) back to Flink/Gelly (current WIP state can be found here: [1]). Thanks, Martin [1] https://github.com/dbs-leipzig/gradoop/blob/%2345_gradoop_flink/gradoop-flink/src/main/java/org/gradoop/model/impl/operators/Summarization.java
Re: How can handles Exist ,not Exist query on flink
ds1.filter(//here selection of query 1) ds2.filter(//here selection of query 2) exist ds1.join(ds2.distinct(id)).where(id).equal(id){ // join by your join key(s) - note the distinct operator, otherwise you will get many line for each input line (left, right) = left //collect left } or ds1.cogroup(ds2).where(id).equal(id){ //cogroup by your join key(s) (left : Iterator, right: Iterator, out: Collector) = if(right.hasNext) //exist something in right dataset while(left.hasNext) //collect all the left out.collect(left.next) } not exist ds1.cogroup(ds2).where(id).equal(id){ //cogroup by your join key(s) (left : Iterator, right: Iterator, out: Collector) = if( ! right.hasNext) //nothing exists in right dataset - note the not (exclamation mark) in front while(left.hasNext) //collect all the left out.collect(left.next) } in short you are doing a full-outer-join and keeping only elements with at [ least one(exist) | no(not exist) ] matching element this is just a sketch written on my smartphone you should re-adapt it to your query cheers Il giorno 16/lug/2015, alle ore 00:44, hagersaleh loveallah1...@yahoo.com ha scritto: please help I want example -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-can-handles-Exist-not-Exist-query-on-flink-tp1939p2068.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Flink Scala performance
If you use the sample data from the example, there must be an issue with the setup. In Flink's standalone mode, it runs in 100ms on my machine. It may be possible that the command line client takes a long time to start up, so it appears that the program run time is long. If it takes so long, one reason may be slow DNS resolution. You can check that by looking at the logs of the client process (int the log folder). Stephan On Thu, Jul 16, 2015 at 2:06 PM, Vinh June hoangthevinh@gmail.com wrote: @Stephan: I use the sample data comes with the sample -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2091.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Submitting jobs from within Scala code
Hi Philipp, it seems that Stephan was right and that your JobGraph is somehow corrupted. You can see it in the exception JobSubmissionException that the JobGraph contains a vertex whose InvokableClassName is null. Furthermore, even the ID and the vertex name are null. This is a strong indicator, that the JobGraph is not correct. Can you also post the log of the JobManager? Do you have the code of your job online? Cheers, Till On Thu, Jul 16, 2015 at 1:20 PM, Philipp Goetze philipp.goe...@tu-ilmenau.de wrote: Hey Tim, here the console output now with log4j: 0[pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.client.program.Client - Starting program in interactive mode 121 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.scala.ClosureCleaner$ - accessedFields: Map() 137 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.scala.ClosureCleaner$ - accessedFields: Map() 183 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types: 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types: 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers: 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types: 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.client.program.Client - Set parallelism 1, plan default parallelism 1 198 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.optimizer.Optimizer - Beginning compilation of program 'Starting Query' 198 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.optimizer.Optimizer - Using a default parallelism of 1 198 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.optimizer.Optimizer - Using default data exchange mode PIPELINED 266 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.common.io.FileInputFormat - Opening input split file:/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt [0,32] 269 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.common.io.FileInputFormat - Opening input split file:/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt [16,16] 412 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.client.program.Client - JobManager actor system address is localhost/127.0.0.1:6123 412 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.client.program.Client - Starting client actor system 415 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.runtime.client.JobClient - Starting JobClient actor system 922 [flink-akka.actor.default-dispatcher-2] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 953 [flink-akka.actor.default-dispatcher-2] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 954 [flink-akka.actor.default-dispatcher-2] DEBUG akka.event.EventStream - Default Loggers started 1044 [flink-akka.actor.default-dispatcher-4] INFO Remoting - Starting remoting 1117 [flink-akka.remote.default-remote-dispatcher-6] DEBUG org.jboss.netty.channel.socket.nio.SelectorUtil - Using select timeout of 500 1118 [flink-akka.remote.default-remote-dispatcher-6] DEBUG org.jboss.netty.channel.socket.nio.SelectorUtil - Epoll-bug workaround enabled = false 1325 [flink-akka.actor.default-dispatcher-2] INFO Remoting - Remoting started; listening on addresses :[akka.tcp://flink@127.0.0.1:58455] 1343 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.runtime.client.JobClient - Started JobClient actor system at 127.0.0.1:58455 1343 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.client.program.Client - Looking up JobManager 1542 [flink-akka.actor.default-dispatcher-2] DEBUG akka.serialization.Serialization(akka://flink) - Using serializer[akka.serialization.JavaSerializer] for
Re: Submitting jobs from within Scala code
Hey Tim, here the console output now with log4j: 0[pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.client.program.Client - Starting program in interactive mode 121 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.scala.ClosureCleaner$ - accessedFields: Map() 137 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.scala.ClosureCleaner$ - accessedFields: Map() 183 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types: 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types: 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers: 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types: 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE 188 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.client.program.Client - Set parallelism 1, plan default parallelism 1 198 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.optimizer.Optimizer - Beginning compilation of program 'Starting Query' 198 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.optimizer.Optimizer - Using a default parallelism of 1 198 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.optimizer.Optimizer - Using default data exchange mode PIPELINED 266 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.common.io.FileInputFormat - Opening input split file:/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt [0,32] 269 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] DEBUG org.apache.flink.api.common.io.FileInputFormat - Opening input split file:/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt [16,16] 412 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.client.program.Client - JobManager actor system address is localhost/127.0.0.1:6123 412 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.client.program.Client - Starting client actor system 415 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.runtime.client.JobClient - Starting JobClient actor system 922 [flink-akka.actor.default-dispatcher-2] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 953 [flink-akka.actor.default-dispatcher-2] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 954 [flink-akka.actor.default-dispatcher-2] DEBUG akka.event.EventStream - Default Loggers started 1044 [flink-akka.actor.default-dispatcher-4] INFO Remoting - Starting remoting 1117 [flink-akka.remote.default-remote-dispatcher-6] DEBUG org.jboss.netty.channel.socket.nio.SelectorUtil - Using select timeout of 500 1118 [flink-akka.remote.default-remote-dispatcher-6] DEBUG org.jboss.netty.channel.socket.nio.SelectorUtil - Epoll-bug workaround enabled = false 1325 [flink-akka.actor.default-dispatcher-2] INFO Remoting - Remoting started; listening on addresses :[akka.tcp://flink@127.0.0.1:58455] 1343 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.runtime.client.JobClient - Started JobClient actor system at 127.0.0.1:58455 1343 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.client.program.Client - Looking up JobManager 1542 [flink-akka.actor.default-dispatcher-2] DEBUG akka.serialization.Serialization(akka://flink) - Using serializer[akka.serialization.JavaSerializer] for message [akka.actor.Identify] 1567 [flink-akka.actor.default-dispatcher-2] DEBUG akka.remote.EndpointWriter - Drained buffer with maxWriteCount: 50, fullBackoffCount: 1, smallBackoffCount: 0, noBackoffCount: 0 , adaptiveBackoff: 1000 1599 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.client.program.Client - JobManager runs at akka.tcp://flink@127.0.0.1:6123/user/jobmanager 1600 [pool-7-thread-1-ScalaTest-running-FlinkCompileIt] INFO org.apache.flink.client.program.Client - Communication between client and
Re: Flink Scala performance
Here are my logs http://pastebin.com/AJwiy2D8 http://pastebin.com/K05H3Qur from client log, it seems to take ~2s, but with time flink run ..., actual time is ~18s -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2095.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Cluster execution -jar files-
Hi, I would like to use the createRemoteEnvironment to run the application in a cluster and I have some questions. Following the documentation in [1] It is not clear to me how to use it. What should be the content of the jar file? All the external libraries that I use? or need to include the program map/reduce to be distributed as well? In the last case, why should I redefine all the operations again in the main source? Shouldn't be included in the jar files? Many thanks Juan
Re: Cluster execution -jar files-
As the JavaDoc explains: * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses * user-defined functions, user-defined input formats, or any libraries, those must be * provided in the JAR files. - external libraries, yes - your program code, no - except your UDFs, those yes -Matthias On 07/16/2015 04:06 PM, Juan Fumero wrote: Missing reference: [1] https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/cluster_execution.html On Don, 2015-07-16 at 16:04 +0200, Juan Fumero wrote: Hi, I would like to use the createRemoteEnvironment to run the application in a cluster and I have some questions. Following the documentation in [1] It is not clear to me how to use it. What should be the content of the jar file? All the external libraries that I use? or need to include the program map/reduce to be distributed as well? In the last case, why should I redefine all the operations again in the main source? Shouldn't be included in the jar files? Many thanks Juan signature.asc Description: OpenPGP digital signature
Re: Flink Scala performance
Is it possible that it takes a long time to spawn JVMs on your system? That this takes up all the time? On Thu, Jul 16, 2015 at 3:34 PM, Vinh June hoangthevinh@gmail.com wrote: Here are my logs http://pastebin.com/AJwiy2D8 http://pastebin.com/K05H3Qur from client log, it seems to take ~2s, but with time flink run ..., actual time is ~18s -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2095.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: getIndexOfThisSubtask : starts at 0 or 1 ?
You are right, Arnaud. Sorry about this. :( I'm pushing a fix right now (for 0.9.1 as well)). Thanks for reporting this! On 16 Jul 2015, at 16:22, LINZ, Arnaud al...@bouyguestelecom.fr wrote: Hello, According to the documentation, getIndexOfThisSubtask starts from 1; /** * Gets the number of the parallel subtask. The numbering starts from 1 and goes up to the parallelism, * as returned by {@link #getNumberOfParallelSubtasks()}. * * @return The number of the parallel subtask. */ int getIndexOfThisSubtask(); but in my code in 0.9.0 it starts at 0 and goes up to getNumberOfParallelSubtasks()-1 I suppose that the doc is wrong, then. Best regards, Arnaud L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
getIndexOfThisSubtask : starts at 0 or 1 ?
Hello, According to the documentation, getIndexOfThisSubtask starts from 1; /** * Gets the number of the parallel subtask. The numbering starts from 1 and goes up to the parallelism, * as returned by {@link #getNumberOfParallelSubtasks()}. * * @return The number of the parallel subtask. */ int getIndexOfThisSubtask(); but in my code in 0.9.0 it starts at 0 and goes up to getNumberOfParallelSubtasks()-1 I suppose that the doc is wrong, then. Best regards, Arnaud L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Fwd: Re: Submitting jobs from within Scala code
Hey Tim, I think my previous mail was intercepted or something similar. However you can find my reply below. I already tried a simpler job which just does a env.fromElements... but still the same stack. How do you normally submit jobs (jars) from within the code? Best Regards, Philipp Forwarded Message Subject:Re: Submitting jobs from within Scala code Date: Thu, 16 Jul 2015 14:31:01 +0200 From: Philipp Goetze philipp.goe...@tu-ilmenau.de To: user@flink.apache.org Hey, from the JobManager I do not get any more hints: 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist - Received message RequestJobCounts at akka://flink/user/archive from Actor[akka://flink/temp/$gc]. 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist - Handled message RequestJobCounts in 0 ms from Actor[akka://flink/temp/$gc]. 13:36:06,674 DEBUG org.eclipse.jetty.util.log - RESPONSE /jobsInfo 200 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received message RequestBlobManagerPort at akka://flink/user/jobmanager from Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b]. 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Handled message RequestBlobManagerPort in 0 ms from Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b]. 13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection - Received PUT request for content addressable BLOB 13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at akka://flink/user/jobmanager from Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443]. 13:36:07,087 INFO org.apache.flink.runtime.jobmanager.JobManager - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query). 13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Running initialization on master for job 146bc2162de9353bbd457a74eda59ae3 (Starting Query). 13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query) org.apache.flink.runtime.client.JobSubmissionException: The vertex null (null) has no invokable class. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) at scala.collection.Iterator$class.foreach(Iterator.scala:743) at scala.collection.AbstractIterator.foreach(Iterator.scala:1195) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Starting Query switched from CREATED to FAILING. 13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Starting Query switched from FAILING to FAILED. 13:36:07,088 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Handled message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) in 1 ms
Re: Gelly EOFException
I thought a bit about this error..in my job I was generating multiple vertices with the same id. Could this cause such errors? Maybe there could be a check about such situations in Gelly.. On Tue, Jul 14, 2015 at 10:00 PM, Andra Lungu lungu.an...@gmail.com wrote: Hello, Sorry for the delay. The bug is not in Gelly, but is, as hinted in the exception and as can be seen in the logs, in Flink's Runtime. Mihail may actually be on to something. The bug is actually very similar to the one described in FLINK-1916. However, as can be seen in the discussion thread there, it's a bit difficult to fix it without some steps to reproduce. I unfortunately managed to reproduce it and have opened a Jira... FLINK-2360 https://issues.apache.org/jira/browse/FLINK-2360. It's a similar delta iteration setting. Hope we can get some help with this. Thanks! Andra On Tue, Jul 14, 2015 at 2:12 PM, Mihail Vieru vi...@informatik.hu-berlin.de wrote: Hi, looks very similar to this bug: https://issues.apache.org/jira/browse/FLINK-1916 Best, Mihail On 14.07.2015 14:09, Andra Lungu wrote: Hi Flavio, Could you also show us a code snippet? On Tue, Jul 14, 2015 at 2:06 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi to all, in my vertex centric iteration I get the following exception, am I doing something wrong or is it a bug of Gelly? starting iteration [1]: CoGroup (Messaging) (6/8) IterationHead(WorksetIteration (Vertex-centric iteration (test.gelly.functions.VUpdateFunction@1814786f | test.gelly.functions.VMessagingFunction@67eecbc2))) (4/8) switched to FAILED with exception. java.io.EOFException at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201) at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39) at com.esotericsoftware.kryo.io.Output.flush(Output.java:163) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:187) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:372) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Best, Flavio
Re: Flink Scala performance
I just checked on web job manager, it says that runtime for flink job is 349ms, but actually it takes 18s using time command in terminal Should I care more about the latter timing ? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Scala-performance-tp2065p2106.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Submitting jobs from within Scala code
Good to hear that your problem is solved :-) Cheers, Till On Thu, Jul 16, 2015 at 5:45 PM, Philipp Goetze philipp.goe...@tu-ilmenau.de wrote: Hi Till, many thanks for your effort. I finally got it working. I'm a bit embarrassed because the issue was solved by using the same flink-dist-JAR from the locally running Flink version. So to say I used an older Snapshot version for compiling than for running :-[ Best Regards, Philipp On 16.07.2015 17:35, Till Rohrmann wrote: Hi Philipp, what I usually do to run a Flink program on a cluster from within my IDE, I create a RemoteExecutionEnvironment. Since I have one UDF (the map function which doubles the values) defined, I also need to specify the jar containing this class. In my case, the jar is called test-1.0-SNAPSHOT.jar. def main(args: Array[String]) { // set up the execution environment val env = ExecutionEnvironment.createRemoteEnvironment(localhost, 6123, target/test-1.0-SNAPSHOT.jar) val elements = env.fromElements(1,2,3,4,5) val doubled = elements.map(x = 2*x) doubled.printOnTaskManager(TaskManager) // execute program env.execute(Flink Scala API Skeleton) } But I also tried your approach of how to submit jobs to Flink and it worked for me as well. Therefore, I guess that there is something wrong with your job. What happens in PigStorage().load? Cheers, Till On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze philipp.goe...@tu-ilmenau.de wrote: Hey Tim, I think my previous mail was intercepted or something similar. However you can find my reply below. I already tried a simpler job which just does a env.fromElements... but still the same stack. How do you normally submit jobs (jars) from within the code? Best Regards, Philipp Forwarded Message Subject: Re: Submitting jobs from within Scala code Date: Thu, 16 Jul 2015 14:31:01 +0200 From: Philipp Goetze philipp.goe...@tu-ilmenau.de philipp.goe...@tu-ilmenau.de To: user@flink.apache.org Hey, from the JobManager I do not get any more hints: 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist - Received message RequestJobCounts at akka://flink/user/archive from Actor[akka://flink/temp/$gc]. 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist - Handled message RequestJobCounts in 0 ms from Actor[akka://flink/temp/$gc]. 13:36:06,674 DEBUG org.eclipse.jetty.util.log - RESPONSE /jobsInfo 200 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received message RequestBlobManagerPort at akka://flink/user/jobmanager from Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b]. 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Handled message RequestBlobManagerPort in 0 ms from Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b]. 13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection - Received PUT request for content addressable BLOB 13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at akka://flink/user/jobmanager from Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443]. 13:36:07,087 INFO org.apache.flink.runtime.jobmanager.JobManager - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query). 13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Running initialization on master for job 146bc2162de9353bbd457a74eda59ae3 (Starting Query). 13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query) org.apache.flink.runtime.client.JobSubmissionException: The vertex null (null) has no invokable class. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) at scala.collection.Iterator$class.foreach(Iterator.scala:743) at scala.collection.AbstractIterator.foreach(Iterator.scala:1195) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at
Re: Flink Kafka example in Scala
Hi Gyula, Cool. I removed .print and the error was gone. However, env.execute failed with errors: . Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) ... Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185) ... Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.api.KafkaSink at java.net.URLClassLoader$1.run(URLClassLoader.java:366) In the following code: val stream = env .addSource(new KafkaSource[String](localhost:2181, test, new SimpleStringSchema)) .addSink(new KafkaSink[String](localhost:2181, test, new SimpleStringSchema)) Anything wrong? I already did import org.apache.flink.streaming.connectors.kafka.api._. Class SimpleStringSchema was modified (see previous post). Thanks, Wendong -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069p2112.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.