Re: [0.10-SNAPSHOT ] When naming yarn application (yarn-session -nm), flink run without -m fails.
Just as a quick update on this: The change has been merged into 0.10-SNAPSHOT. Flink is now writing the jobmanager connection information into the temp directory. On Wed, Aug 26, 2015 at 6:00 PM, Maximilian Michels m...@apache.org wrote: Nice. More configuration options :) On Wed, Aug 26, 2015 at 5:58 PM, Robert Metzger rmetz...@apache.org wrote: Therefore, my change will include a configuration option to set a custom location for the file. On Wed, Aug 26, 2015 at 5:55 PM, Maximilian Michels m...@apache.org wrote: The only problem with writing the temp is that it will be gone after a restart. While this is not important for PIDs because the system has been restarted anyways, this can actually be a problem if you want to resume a YARN cluster after you have restarted your system. On Wed, Aug 26, 2015 at 3:34 PM, Robert Metzger rmetz...@apache.org wrote: Yep. I think the start-*.sh scripts are also writing the PID to tmp. On Wed, Aug 26, 2015 at 3:30 PM, Maximilian Michels m...@apache.org wrote: Can't we write the file to the system's temp directory or the user home? IMHO this is more standard practice for these type of session information. On Wed, Aug 26, 2015 at 3:25 PM, Robert Metzger rmetz...@apache.org wrote: Great ;) Not yet, but you are the second user to request this. I think I'll put the file somewhere else now. On Wed, Aug 26, 2015 at 3:19 PM, LINZ, Arnaud al...@bouyguestelecom.fr wrote: Ooops… Seems it was rather a write problem on the conf dir… Sorry, it works! BTW, it’s not really nice to have an application write the configuration dir ; it’s often a root protected directory in usr/lib/flink. Is there a parameter to put that file elsewhere ? De : Robert Metzger [mailto:rmetz...@apache.org] Envoyé : mercredi 26 août 2015 14:42 À : user@flink.apache.org Objet : Re: [0.10-SNAPSHOT ] When naming yarn application (yarn-session -nm), flink run without -m fails. Hi Arnaud, usually, you don't have to manually specify the JobManager address manually with the -m argument, because it is reading it from the conf/.yarn-session.properties file. Give me a few minutes to reproduce the issue. On Wed, Aug 26, 2015 at 2:39 PM, LINZ, Arnaud al...@bouyguestelecom.fr wrote: Hi, Using last nightly build, it seems that if you call yarn-session.sh with -nm option to give a nice application name, then you cannot submit a job with flink run without specify the ever changing -m jobManager address since it does not find it any longer. Regards, Arnaud L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Re: Flink YARN Client requested shutdown in flink -m yarn-cluster mode?
Is the log from 0.9-SNAPSHOT or 0.10-SNAPSHOT? Can you send me (if you want privately as well) the full log of the yarn application: yarn logs -applicationId appId. We need to find out why the TaskManagers are shutting down. That is most likely logged in the TaskManager logs. On Fri, Aug 28, 2015 at 10:57 AM, LINZ, Arnaud al...@bouyguestelecom.fr wrote: Hello, I’ve moved my version from 0.9.0 and tried both 0.9-SNAPSHOT 0.10-SNAPSHOT to continue my batch execution on my secured cluster thanks to [FLINK-2555]. My application works nicely in local mode and also in yarn mode using a job container started with yarn-session.sh, but it fails in –m yarn-cluster mode Yarn logs indicate that “Flink YARN Client requested shutdown” but I did nothing like that (or not intentionally). The nodes are not even starting and the exec() does not return any JobExecutionResult. My command line was : flink run -m yarn-cluster -yd -yn 2 -ytm 1500 -yqu default -ys 4 --class myMainClass myJar some options Any idea what I’ve done wrong? Greetings, Arnaud PS - Yarn log extract : (…) 09:56:29,111 INFO org.apache.flink.yarn.YarnTaskManager - Successful registration at JobManager (akka.tcp:// flink@172.19.115.51:54806/user/jobmanager), starting network stack and library cache. 09:56:29,817 INFO org.apache.flink.runtime.io.network.netty.NettyClient - Successful initialization (took 73 ms). 09:56:29,889 INFO org.apache.flink.runtime.io.network.netty.NettyServer - Successful initialization (took 55 ms). Listening on SocketAddress / 172.19.115.52:41920. 09:56:29,890 INFO org.apache.flink.yarn.YarnTaskManager - Determined BLOB server address to be /172.19.115.51:38505. Starting BLOB cache. 09:56:29,893 INFO org.apache.flink.runtime.blob.BlobCache - Created BLOB cache storage directory /tmp/blobStore-7150f7d7-f7a3-4c4c-9cda-3877da5aacd6 09:56:52,367 INFO org.apache.flink.yarn.YarnTaskManager - Received task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (1/3) 09:56:52,375 INFO org.apache.flink.yarn.YarnTaskManager - Received task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (3/3) 09:56:52,383 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (1/3) 09:56:52,387 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (3/3) 09:56:52,394 INFO org.apache.flink.yarn.YarnTaskManager - Received task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (1/3) 09:56:52,402 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (1/3) 09:56:52,425 INFO org.apache.flink.yarn.YarnTaskManager - Received task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (2/3) 09:56:52,429 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (2/3) 09:56:52,454 INFO org.apache.flink.yarn.YarnTaskManager - Stopping YARN TaskManager with final application status FAILED and diagnostics: Flink YARN Client requested shutdown 09:56:52,480 INFO org.apache.flink.yarn.YarnTaskManager - Stopping TaskManager akka://flink/user/taskmanager#2116513584. 09:56:52,483 INFO org.apache.flink.yarn.YarnTaskManager - Cancelling all computations and
Re: Java 8 and type erasure
Hey Kristoffer, sorry for the late reply. I was on vacation. Here you can find my initial email that also contains a description and a link to the patch: http://mail.openjdk.java.net/pipermail/compiler-dev/2015-January/009220.html The Eclipse JDT team didn't really need a patch. Their compiler does not throw away generic type information in early stages so it was easy to add them to the generic signature (which accidentally happened at the beginning and now works again with help of a compiler option): https://bugs.eclipse.org/bugs/show_bug.cgi?id=449063 The OpenJDK/Oracle JDK compiler mainly works without generic type information (they call erasure() intentionally), so the produced class files do not contain information that can be determined by any hacks. It would be great if we could convince them. I think many projects need more type-safety for lambda expressions. On 18.08.2015 14:58, Stephan Ewen wrote: Timo should still have the patch! If you want to re-vive the thread, that'd be great. I'd be happy to support it. On Tue, Aug 18, 2015 at 2:51 PM, Kristoffer Sjögren sto...@gmail.com mailto:sto...@gmail.com wrote: Do you have a link to these patches? Reading through the thread, I get the feeling they didn't reject the idea completely. Considering there are also other projects (Crunch, Spark, Storm, etc) that would benefit from this, maybe we can convince them together? On Tue, Aug 18, 2015 at 2:27 PM, Stephan Ewen se...@apache.org mailto:se...@apache.org wrote: Yep, Timo wrote the patch, both for OpenJDK and JDT - the JDT one was accepted in the end. On Tue, Aug 18, 2015 at 2:09 PM, Robert Metzger rmetz...@apache.org mailto:rmetz...@apache.org wrote: Exactly, Timo opened the thread. On Tue, Aug 18, 2015 at 2:04 PM, Kristoffer Sjögren sto...@gmail.com mailto:sto...@gmail.com wrote: Yeah, I think I found the thread already... by Timo Walther? On Tue, Aug 18, 2015 at 2:01 PM, Stephan Ewen se...@apache.org mailto:se...@apache.org wrote: Would have been great. I had high hopes when I saw the trick with the constant pool, but this is only to make what Flink does already applicable to non-serializable lambdas. If you want to help us with this, I'll ping you for some support on the OpenJDK mailing list ;-) On Tue, Aug 18, 2015 at 1:59 PM, Kristoffer Sjögren sto...@gmail.com mailto:sto...@gmail.com wrote: I suspected that you already had looked into this, but it was worth a try. It would make everything so much easier. Thanks for the explanation :-) On Tue, Aug 18, 2015 at 1:50 PM, Stephan Ewen se...@apache.org mailto:se...@apache.org wrote: Hi Kristoffer! I looked through the code as well. In fact, Flink currently uses the trick mentioned for Serializable Lambdas in the gist you sent me. This works well for lambdas that return simple types (primitives or classes without generics). The information for the generic parametrization is unfortunately really erased, it is in no signature or anything. Java has the concept of generic method signatures, which means that a method gets a signature string that includes the generic types. These signatures are generated for regular functions, but OpenJDK and OracleJDK do not generate them for synthetic methods (like lambdas). We tried to submit a patch to OpenJDK to add these generic signatures to lambda methods, but they did not like the fact that we try to figure out the generic types of lambdas. I hope they change their minds at some point... Stephan On Tue, Aug 18, 2015 at 11:46 AM, Aljoscha Krettek aljos...@apache.org mailto:aljos...@apache.org wrote: Unfortunately, this also doesn't work for the same reasons. The generic types of generic parameters of a lambda are not stored anywhere. Stephan mentioned to me that the only possibility right now would be to look at the code using something like ASM to find a cast in the code to the concrete type of the generic parameter. On Tue, 18 Aug 2015 at 11:35 Kristoffer Sjögren sto...@gmail.com mailto:sto...@gmail.com wrote: How about https://github.com/jhalterman/typetools? On Tue, Aug 18, 2015 at 11:16 AM, Aljoscha Krettek aljos...@apache.org mailto:aljos...@apache.org wrote: Hi Kristoffer, I'm afraid not, but maybe Timo has some further information. In this extended
Re: Event time in Flink streaming
Hi Martin, you need to implement you own policy. However, this should be be complicated. Have a look at TimeTriggerPolicy. You just need to provide a Timestamp implementation that extracts you ts-attribute from the tuples. -Matthias On 08/28/2015 03:58 PM, Martin Neumann wrote: Hej, I have a stream of timestamped events I want to process in Flink streaming. Di I have to write my own policies to do so, or can define time based windows to use the timestamps instead of the system time? cheers Martin signature.asc Description: OpenPGP digital signature
Best way for simple logging in jobs?
Hi, I am wondering if it’s possible to get my own logs inside the job functions (sources, mappers, sinks…). It would be nice if I could get those logs in the Yarn’s logs, but writing System.out/System.err has no effect. For now I’m using a “StringBuffer” accumulator but it does not work in streaming apps before v0.10, and only show results at the end. I’ll probably end up using a HDFS logging system but there is maybe a smarter way ? Greetings, Arnaud L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Event time in Flink streaming
Hej, I have a stream of timestamped events I want to process in Flink streaming. Di I have to write my own policies to do so, or can define time based windows to use the timestamps instead of the system time? cheers Martin
RE: Flink YARN Client requested shutdown in flink -m yarn-cluster mode?
Hi Robert, As seen together, my mistake was to launch the job in detached mode (-yd) when my main function was not waiting after execution and was immediately ending. Sorry for my misunderstanding of this option. Best regards, Arnaud De : Robert Metzger [mailto:rmetz...@apache.org] Envoyé : vendredi 28 août 2015 11:03 À : user@flink.apache.org Objet : Re: Flink YARN Client requested shutdown in flink -m yarn-cluster mode? Is the log from 0.9-SNAPSHOT or 0.10-SNAPSHOT? Can you send me (if you want privately as well) the full log of the yarn application: yarn logs -applicationId appId. We need to find out why the TaskManagers are shutting down. That is most likely logged in the TaskManager logs. On Fri, Aug 28, 2015 at 10:57 AM, LINZ, Arnaud al...@bouyguestelecom.frmailto:al...@bouyguestelecom.fr wrote: Hello, I’ve moved my version from 0.9.0 and tried both 0.9-SNAPSHOT 0.10-SNAPSHOT to continue my batch execution on my secured cluster thanks to [FLINK-2555]. My application works nicely in local mode and also in yarn mode using a job container started with yarn-session.sh, but it fails in –m yarn-cluster mode Yarn logs indicate that “Flink YARN Client requested shutdown” but I did nothing like that (or not intentionally). The nodes are not even starting and the exec() does not return any JobExecutionResult. My command line was : flink run -m yarn-cluster -yd -yn 2 -ytm 1500 -yqu default -ys 4 --class myMainClass myJar some options Any idea what I’ve done wrong? Greetings, Arnaud PS - Yarn log extract : (…) 09:56:29,111 INFO org.apache.flink.yarn.YarnTaskManager - Successful registration at JobManager (akka.tcp://flink@172.19.115.51:54806/user/jobmanagerhttp://flink@172.19.115.51:54806/user/jobmanager), starting network stack and library cache. 09:56:29,817 INFO org.apache.flink.runtime.io.network.netty.NettyClient - Successful initialization (took 73 ms). 09:56:29,889 INFO org.apache.flink.runtime.io.network.netty.NettyServer - Successful initialization (took 55 ms). Listening on SocketAddress /172.19.115.52:41920http://172.19.115.52:41920. 09:56:29,890 INFO org.apache.flink.yarn.YarnTaskManager - Determined BLOB server address to be /172.19.115.51:38505http://172.19.115.51:38505. Starting BLOB cache. 09:56:29,893 INFO org.apache.flink.runtime.blob.BlobCache - Created BLOB cache storage directory /tmp/blobStore-7150f7d7-f7a3-4c4c-9cda-3877da5aacd6 09:56:52,367 INFO org.apache.flink.yarn.YarnTaskManager - Received task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (1/3) 09:56:52,375 INFO org.apache.flink.yarn.YarnTaskManager - Received task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (3/3) 09:56:52,383 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (1/3) 09:56:52,387 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (3/3) 09:56:52,394 INFO org.apache.flink.yarn.YarnTaskManager - Received task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (1/3) 09:56:52,402 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (1/3) 09:56:52,425 INFO org.apache.flink.yarn.YarnTaskManager - Received task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (2/3) 09:56:52,429 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -
Re: Flink YARN Client requested shutdown in flink -m yarn-cluster mode?
Hi, no problem. The behavior is not documented and I also needed some time to figure this out ;) I'm already preparing a pull request to add a note into the documentation. On Fri, Aug 28, 2015 at 4:41 PM, LINZ, Arnaud al...@bouyguestelecom.fr wrote: Hi Robert, As seen together, my mistake was to launch the job in detached mode (-yd) when my main function was not waiting after execution and was immediately ending. Sorry for my misunderstanding of this option. Best regards, Arnaud *De :* Robert Metzger [mailto:rmetz...@apache.org] *Envoyé :* vendredi 28 août 2015 11:03 *À :* user@flink.apache.org *Objet :* Re: Flink YARN Client requested shutdown in flink -m yarn-cluster mode? Is the log from 0.9-SNAPSHOT or 0.10-SNAPSHOT? Can you send me (if you want privately as well) the full log of the yarn application: yarn logs -applicationId appId. We need to find out why the TaskManagers are shutting down. That is most likely logged in the TaskManager logs. On Fri, Aug 28, 2015 at 10:57 AM, LINZ, Arnaud al...@bouyguestelecom.fr wrote: Hello, I’ve moved my version from 0.9.0 and tried both 0.9-SNAPSHOT 0.10-SNAPSHOT to continue my batch execution on my secured cluster thanks to [FLINK-2555]. My application works nicely in local mode and also in yarn mode using a job container started with yarn-session.sh, but it fails in –m yarn-cluster mode Yarn logs indicate that “Flink YARN Client requested shutdown” but I did nothing like that (or not intentionally). The nodes are not even starting and the exec() does not return any JobExecutionResult. My command line was : flink run -m yarn-cluster -yd -yn 2 -ytm 1500 -yqu default -ys 4 --class myMainClass myJar some options Any idea what I’ve done wrong? Greetings, Arnaud PS - Yarn log extract : (…) 09:56:29,111 INFO org.apache.flink.yarn.YarnTaskManager - Successful registration at JobManager (akka.tcp:// flink@172.19.115.51:54806/user/jobmanager), starting network stack and library cache. 09:56:29,817 INFO org.apache.flink.runtime.io.network.netty.NettyClient - Successful initialization (took 73 ms). 09:56:29,889 INFO org.apache.flink.runtime.io.network.netty.NettyServer - Successful initialization (took 55 ms). Listening on SocketAddress / 172.19.115.52:41920. 09:56:29,890 INFO org.apache.flink.yarn.YarnTaskManager - Determined BLOB server address to be /172.19.115.51:38505. Starting BLOB cache. 09:56:29,893 INFO org.apache.flink.runtime.blob.BlobCache - Created BLOB cache storage directory /tmp/blobStore-7150f7d7-f7a3-4c4c-9cda-3877da5aacd6 09:56:52,367 INFO org.apache.flink.yarn.YarnTaskManager - Received task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (1/3) 09:56:52,375 INFO org.apache.flink.yarn.YarnTaskManager - Received task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (3/3) 09:56:52,383 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (1/3) 09:56:52,387 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (3/3) 09:56:52,394 INFO org.apache.flink.yarn.YarnTaskManager - Received task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (1/3) 09:56:52,402 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (1/3) 09:56:52,425 INFO org.apache.flink.yarn.YarnTaskManager - Received task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (2/3) 09:56:52,429 INFO org.apache.flink.runtime.taskmanager.Task
Re: Event time in Flink streaming
Hi Martin, the answer depends, because the current windowing implementation has some problems. We are working on improving it in the 0.10 release, though. If your elements arrive with strictly increasing timestamps and you have parallelism=1 or don't perform any re-partitioning of data (which a groupBy() does, for example) then what Matthias proposed works for you. If not then you can get intro problems with out-of-order elements and windows will be incorrectly determined. If you are interested in what we are working on for 0.10, please look at the design documents here https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams and here https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams. The basic idea is to make windows work correctly when elements arrive not ordered by timestamps. For this we want use watermarks as popularized, for example, by Google Dataflow. Please ask if you have questions about this or are interested in joining the discussion (the design as not yet finalized, both API and implementation). :D Cheers, Aljoscha P.S. I have some proof-of-concept work in a branch of mine, if you interested in my work there I could give you access to it. On Fri, 28 Aug 2015 at 16:11 Matthias J. Sax mj...@informatik.hu-berlin.de wrote: Hi Martin, you need to implement you own policy. However, this should be be complicated. Have a look at TimeTriggerPolicy. You just need to provide a Timestamp implementation that extracts you ts-attribute from the tuples. -Matthias On 08/28/2015 03:58 PM, Martin Neumann wrote: Hej, I have a stream of timestamped events I want to process in Flink streaming. Di I have to write my own policies to do so, or can define time based windows to use the timestamps instead of the system time? cheers Martin
Re: Event time in Flink streaming
The stream consists of logs from different machines with synchronized clocks. As a result timestamps are not strictly increasing but there is a bound on how much out of order they can be. (One aim is to detect events go out of order more then a certain amount indication some problem in the system setup) I will look at the example policies and see if I can find a way to make it work with 0.9. I am aware of Google Dataflow and the discussion on Flink, though I just recently learned more about the field, so I didn't have to much useful to say. This might change if I get some more experience with the usecase I'm working on. cheers Martin On Fri, Aug 28, 2015 at 5:06 PM, Aljoscha Krettek aljos...@apache.org wrote: Hi Martin, the answer depends, because the current windowing implementation has some problems. We are working on improving it in the 0.10 release, though. If your elements arrive with strictly increasing timestamps and you have parallelism=1 or don't perform any re-partitioning of data (which a groupBy() does, for example) then what Matthias proposed works for you. If not then you can get intro problems with out-of-order elements and windows will be incorrectly determined. If you are interested in what we are working on for 0.10, please look at the design documents here https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams and here https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams. The basic idea is to make windows work correctly when elements arrive not ordered by timestamps. For this we want use watermarks as popularized, for example, by Google Dataflow. Please ask if you have questions about this or are interested in joining the discussion (the design as not yet finalized, both API and implementation). :D Cheers, Aljoscha P.S. I have some proof-of-concept work in a branch of mine, if you interested in my work there I could give you access to it. On Fri, 28 Aug 2015 at 16:11 Matthias J. Sax mj...@informatik.hu-berlin.de wrote: Hi Martin, you need to implement you own policy. However, this should be be complicated. Have a look at TimeTriggerPolicy. You just need to provide a Timestamp implementation that extracts you ts-attribute from the tuples. -Matthias On 08/28/2015 03:58 PM, Martin Neumann wrote: Hej, I have a stream of timestamped events I want to process in Flink streaming. Di I have to write my own policies to do so, or can define time based windows to use the timestamps instead of the system time? cheers Martin
Re: Best way for simple logging in jobs?
Hi, Creating a slf4j logger like this: private static final Logger LOG = LoggerFactory.getLogger(PimpedKafkaSink.class); Works for me. The messages also end up in the regular YARN logs. Also system out should end up in YARN actually (when retrieving the logs from the log aggregation). Regards, Robert On Fri, Aug 28, 2015 at 3:55 PM, LINZ, Arnaud al...@bouyguestelecom.fr wrote: Hi, I am wondering if it’s possible to get my own logs inside the job functions (sources, mappers, sinks…). It would be nice if I could get those logs in the Yarn’s logs, but writing System.out/System.err has no effect. For now I’m using a “StringBuffer” accumulator but it does not work in streaming apps before v0.10, and only show results at the end. I’ll probably end up using a HDFS logging system but there is maybe a smarter way ? Greetings, Arnaud -- L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.