Re: [0.10-SNAPSHOT ] When naming yarn application (yarn-session -nm), flink run without -m fails.

2015-08-28 Thread Robert Metzger
Just as a quick update on this: The change has been merged into
0.10-SNAPSHOT.
Flink is now writing the jobmanager connection information into the temp
directory.

On Wed, Aug 26, 2015 at 6:00 PM, Maximilian Michels m...@apache.org wrote:

 Nice. More configuration options :)

 On Wed, Aug 26, 2015 at 5:58 PM, Robert Metzger rmetz...@apache.org
 wrote:
  Therefore, my change will include a configuration option to set a custom
  location for the file.
 
  On Wed, Aug 26, 2015 at 5:55 PM, Maximilian Michels m...@apache.org
 wrote:
 
  The only problem with writing the temp is that it will be gone after a
  restart. While this is not important for PIDs because the system has
  been restarted anyways, this can actually be a problem if you want to
  resume a YARN cluster after you have restarted your system.
 
  On Wed, Aug 26, 2015 at 3:34 PM, Robert Metzger rmetz...@apache.org
  wrote:
   Yep. I think the start-*.sh scripts are also writing the PID to tmp.
  
   On Wed, Aug 26, 2015 at 3:30 PM, Maximilian Michels m...@apache.org
   wrote:
  
   Can't we write the file to the system's temp directory or the user
   home? IMHO this is more standard practice for these type of session
   information.
  
   On Wed, Aug 26, 2015 at 3:25 PM, Robert Metzger rmetz...@apache.org
 
   wrote:
Great ;)
   
Not yet, but you are the second user to request this.
I think I'll put the file somewhere else now.
   
On Wed, Aug 26, 2015 at 3:19 PM, LINZ, Arnaud
al...@bouyguestelecom.fr
wrote:
   
Ooops… Seems it was rather a write problem on the conf dir…
   
Sorry, it works!
   
   
   
BTW, it’s not really nice to have an application write the
configuration
dir ; it’s often a root protected directory in usr/lib/flink. Is
there
a
parameter to put that file elsewhere ?
   
   
   
   
   
De : Robert Metzger [mailto:rmetz...@apache.org]
Envoyé : mercredi 26 août 2015 14:42
À : user@flink.apache.org
Objet : Re: [0.10-SNAPSHOT ] When naming yarn application
(yarn-session
-nm), flink run without -m fails.
   
   
   
Hi Arnaud,
   
   
   
usually, you don't have to manually specify the JobManager address
manually with the -m argument, because it is reading it from the
conf/.yarn-session.properties file.
   
   
   
Give me a few minutes to reproduce the issue.
   
   
   
On Wed, Aug 26, 2015 at 2:39 PM, LINZ, Arnaud
al...@bouyguestelecom.fr
wrote:
   
Hi,
Using last nightly build, it seems that if you call
 yarn-session.sh
with
-nm option to give a nice application name, then you cannot
 submit a
job
with flink run without specify the ever changing -m jobManager
address
since it does not find it any longer.
   
Regards,
   
Arnaud
   
   
   

   
   
L'intégrité de ce message n'étant pas assurée sur internet, la
société
expéditrice ne peut être tenue responsable de son contenu ni de
 ses
pièces
jointes. Toute utilisation ou diffusion non autorisée est
 interdite.
Si
vous
n'êtes pas destinataire de ce message, merci de le détruire et
d'avertir
l'expéditeur.
   
The integrity of this message cannot be guaranteed on the
 Internet.
The
company that sent this message cannot therefore be held liable for
its
content nor attachments. Any unauthorized use or dissemination is
prohibited. If you are not the intended recipient of this message,
then
please delete it and notify the sender.
   
   
   
   
  
  
 
 



Re: Flink YARN Client requested shutdown in flink -m yarn-cluster mode?

2015-08-28 Thread Robert Metzger
Is the log from 0.9-SNAPSHOT or 0.10-SNAPSHOT?

Can you send me (if you want privately as well) the full log of the yarn
application:

yarn logs -applicationId appId.

We need to find out why the TaskManagers are shutting down. That is most
likely logged in the TaskManager logs.


On Fri, Aug 28, 2015 at 10:57 AM, LINZ, Arnaud al...@bouyguestelecom.fr
wrote:

 Hello,



 I’ve moved my version from 0.9.0 and tried both 0.9-SNAPSHOT 
 0.10-SNAPSHOT to continue my batch execution on my secured cluster thanks
 to [FLINK-2555].

 My application works nicely in local mode and also in yarn mode using a
 job container started with yarn-session.sh, but it fails in –m
 yarn-cluster mode



 Yarn logs indicate that  “Flink YARN Client requested shutdown” but I did
 nothing like that (or not intentionally). The nodes are not even starting
 and the exec() does not return any JobExecutionResult.



 My command line was :

 flink run -m yarn-cluster -yd -yn 2 -ytm 1500 -yqu default -ys 4 --class
 myMainClass myJar some options



 Any idea what I’ve done wrong?



 Greetings,

 Arnaud



 PS - Yarn log extract :

 (…)

 09:56:29,111 INFO
 org.apache.flink.yarn.YarnTaskManager - Successful
 registration at JobManager (akka.tcp://
 flink@172.19.115.51:54806/user/jobmanager), starting network stack and
 library cache.

 09:56:29,817 INFO
 org.apache.flink.runtime.io.network.netty.NettyClient - Successful
 initialization (took 73 ms).

 09:56:29,889 INFO
 org.apache.flink.runtime.io.network.netty.NettyServer - Successful
 initialization (took 55 ms). Listening on SocketAddress /
 172.19.115.52:41920.

 09:56:29,890 INFO
 org.apache.flink.yarn.YarnTaskManager - Determined
 BLOB server address to be /172.19.115.51:38505. Starting BLOB cache.

 09:56:29,893 INFO
 org.apache.flink.runtime.blob.BlobCache   - Created
 BLOB cache storage directory
 /tmp/blobStore-7150f7d7-f7a3-4c4c-9cda-3877da5aacd6

 09:56:52,367 INFO
 org.apache.flink.yarn.YarnTaskManager - Received
 task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (1/3)

 09:56:52,375 INFO
 org.apache.flink.yarn.YarnTaskManager - Received
 task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (3/3)

 09:56:52,383 INFO  org.apache.flink.runtime.taskmanager.Task
 - Loading JAR files for task CHAIN DataSource (at
 createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (1/3)

 09:56:52,387 INFO
 org.apache.flink.runtime.taskmanager.Task - Loading JAR
 files for task CHAIN DataSource (at
 createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (3/3)

 09:56:52,394 INFO
 org.apache.flink.yarn.YarnTaskManager - Received
 task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (1/3)

 09:56:52,402 INFO
 org.apache.flink.runtime.taskmanager.Task - Loading JAR
 files for task CHAIN DataSource (at
 createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (1/3)

 09:56:52,425 INFO
 org.apache.flink.yarn.YarnTaskManager - Received
 task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (2/3)

 09:56:52,429 INFO
 org.apache.flink.runtime.taskmanager.Task - Loading JAR
 files for task CHAIN DataSource (at
 createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (2/3)

 09:56:52,454 INFO
 org.apache.flink.yarn.YarnTaskManager - Stopping
 YARN TaskManager with final application status FAILED and diagnostics:
 Flink YARN Client requested shutdown

 09:56:52,480 INFO
 org.apache.flink.yarn.YarnTaskManager - Stopping
 TaskManager akka://flink/user/taskmanager#2116513584.

 09:56:52,483 INFO
 org.apache.flink.yarn.YarnTaskManager - Cancelling
 all computations and 

Re: Java 8 and type erasure

2015-08-28 Thread Timo Walther

Hey Kristoffer,

sorry for the late reply. I was on vacation.

Here you can find my initial email that also contains a description and 
a link to the patch: 
http://mail.openjdk.java.net/pipermail/compiler-dev/2015-January/009220.html


The Eclipse JDT team didn't really need a patch. Their compiler does not 
throw away generic type information in early stages so it was easy to 
add them to the generic signature (which accidentally happened at the 
beginning and now works again with help of a compiler option): 
https://bugs.eclipse.org/bugs/show_bug.cgi?id=449063


The OpenJDK/Oracle JDK compiler mainly works without generic type 
information (they call erasure() intentionally), so the produced class 
files do not contain information that can be determined by any hacks.


It would be great if we could convince them. I think many projects need 
more type-safety for lambda expressions.



On 18.08.2015 14:58, Stephan Ewen wrote:

Timo should still have the patch!

If you want to re-vive the thread, that'd be great. I'd be happy to 
support it.




On Tue, Aug 18, 2015 at 2:51 PM, Kristoffer Sjögren sto...@gmail.com 
mailto:sto...@gmail.com wrote:


Do you have a link to these patches?

Reading through the thread, I get the feeling they didn't reject the
idea completely.

Considering there are also other projects (Crunch, Spark, Storm, etc)
that would benefit from this, maybe we can convince them together?


On Tue, Aug 18, 2015 at 2:27 PM, Stephan Ewen se...@apache.org
mailto:se...@apache.org wrote:
 Yep, Timo wrote the patch, both for OpenJDK and JDT - the JDT
one was
 accepted in the end.


 On Tue, Aug 18, 2015 at 2:09 PM, Robert Metzger
rmetz...@apache.org mailto:rmetz...@apache.org wrote:

 Exactly, Timo opened the thread.

 On Tue, Aug 18, 2015 at 2:04 PM, Kristoffer Sjögren
sto...@gmail.com mailto:sto...@gmail.com
 wrote:

 Yeah, I think I found the thread already... by Timo Walther?

 On Tue, Aug 18, 2015 at 2:01 PM, Stephan Ewen
se...@apache.org mailto:se...@apache.org wrote:
  Would have been great. I had high hopes when I saw the trick
with the
  constant pool, but this is only to make what Flink does
already
  applicable
  to non-serializable lambdas.
 
  If you want to help us with this, I'll ping you for some
support on the
  OpenJDK mailing list ;-)
 
  On Tue, Aug 18, 2015 at 1:59 PM, Kristoffer Sjögren
sto...@gmail.com mailto:sto...@gmail.com
  wrote:
 
  I suspected that you already had looked into this, but it
was worth a
  try. It would make everything so much easier.
 
  Thanks for the explanation :-)
 
 
  On Tue, Aug 18, 2015 at 1:50 PM, Stephan Ewen
se...@apache.org mailto:se...@apache.org
  wrote:
   Hi Kristoffer!
  
   I looked through the code as well. In fact, Flink
currently uses the
   trick
   mentioned for Serializable Lambdas in the gist you sent me.
  
   This works well for lambdas that return simple types
(primitives or
   classes
   without generics). The information for the generic
parametrization
   is
   unfortunately really erased, it is in no signature or
anything.
  
   Java has the concept of generic method signatures,
which means
   that a
   method gets a signature string that includes the generic
types.
   These
   signatures are generated for regular functions, but
OpenJDK and
   OracleJDK do
   not generate them for synthetic methods (like lambdas).
  
   We tried to submit a patch to OpenJDK to add these generic
   signatures to
   lambda methods, but they did not like the fact that we
try to figure
   out
   the
   generic types of lambdas. I hope they change their minds
at some
   point...
  
   Stephan
  
  
  
  
   On Tue, Aug 18, 2015 at 11:46 AM, Aljoscha Krettek
   aljos...@apache.org mailto:aljos...@apache.org
   wrote:
  
   Unfortunately, this also doesn't work for the same
reasons. The
   generic
   types of generic parameters of a lambda are not stored
anywhere.
   Stephan
   mentioned to me that the only possibility right now
would be to
   look at
   the
   code using something like ASM to find a cast in the code
to the
   concrete
   type of the generic parameter.
  
   On Tue, 18 Aug 2015 at 11:35 Kristoffer Sjögren
sto...@gmail.com mailto:sto...@gmail.com
   wrote:
  
   How about https://github.com/jhalterman/typetools?
  
   On Tue, Aug 18, 2015 at 11:16 AM, Aljoscha Krettek
   aljos...@apache.org mailto:aljos...@apache.org
   wrote:
Hi Kristoffer,
I'm afraid not, but maybe Timo has some further
information. In
this
extended 

Re: Event time in Flink streaming

2015-08-28 Thread Matthias J. Sax
Hi Martin,

you need to implement you own policy. However, this should be be
complicated. Have a look at TimeTriggerPolicy. You just need to
provide a Timestamp implementation that extracts you ts-attribute from
the tuples.

-Matthias

On 08/28/2015 03:58 PM, Martin Neumann wrote:
 Hej,
 
 I have a stream of timestamped events I want to process in Flink streaming.
 Di I have to write my own policies to do so, or can define time based
 windows to use the timestamps instead of the system time?
 
 cheers Martin



signature.asc
Description: OpenPGP digital signature


Best way for simple logging in jobs?

2015-08-28 Thread LINZ, Arnaud
Hi,



I am wondering if it’s possible to get my own logs inside the job functions 
(sources, mappers, sinks…).  It would be nice if I could get those logs in the 
Yarn’s logs, but writing System.out/System.err has no effect.



For now I’m using a “StringBuffer” accumulator but it does not work in 
streaming apps before v0.10, and only show results at the end.



I’ll probably end up using a HDFS logging system but there is maybe a smarter 
way ?



Greetings,

Arnaud





L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Event time in Flink streaming

2015-08-28 Thread Martin Neumann
Hej,

I have a stream of timestamped events I want to process in Flink streaming.
Di I have to write my own policies to do so, or can define time based
windows to use the timestamps instead of the system time?

cheers Martin


RE: Flink YARN Client requested shutdown in flink -m yarn-cluster mode?

2015-08-28 Thread LINZ, Arnaud
Hi Robert,

As seen together, my mistake was to launch the job in detached mode (-yd) when 
my main function was not waiting after execution and was immediately ending. 
Sorry for my misunderstanding of this option.

Best regards,
Arnaud

De : Robert Metzger [mailto:rmetz...@apache.org]
Envoyé : vendredi 28 août 2015 11:03
À : user@flink.apache.org
Objet : Re: Flink YARN Client requested shutdown in flink -m yarn-cluster 
mode?

Is the log from 0.9-SNAPSHOT or 0.10-SNAPSHOT?

Can you send me (if you want privately as well) the full log of the yarn 
application:

yarn logs -applicationId appId.

We need to find out why the TaskManagers are shutting down. That is most likely 
logged in the TaskManager logs.


On Fri, Aug 28, 2015 at 10:57 AM, LINZ, Arnaud 
al...@bouyguestelecom.frmailto:al...@bouyguestelecom.fr wrote:

Hello,



I’ve moved my version from 0.9.0 and tried both 0.9-SNAPSHOT  0.10-SNAPSHOT to 
continue my batch execution on my secured cluster thanks to [FLINK-2555].

My application works nicely in local mode and also in yarn mode using a job 
container started with yarn-session.sh, but it fails in –m yarn-cluster mode



Yarn logs indicate that  “Flink YARN Client requested shutdown” but I did 
nothing like that (or not intentionally). The nodes are not even starting and 
the exec() does not return any JobExecutionResult.



My command line was :

flink run -m yarn-cluster -yd -yn 2 -ytm 1500 -yqu default -ys 4 --class 
myMainClass myJar some options



Any idea what I’ve done wrong?



Greetings,

Arnaud



PS - Yarn log extract :

(…)

09:56:29,111 INFO  org.apache.flink.yarn.YarnTaskManager
 - Successful registration at JobManager 
(akka.tcp://flink@172.19.115.51:54806/user/jobmanagerhttp://flink@172.19.115.51:54806/user/jobmanager),
 starting network stack and library cache.

09:56:29,817 INFO  org.apache.flink.runtime.io.network.netty.NettyClient
 - Successful initialization (took 73 ms).

09:56:29,889 INFO  org.apache.flink.runtime.io.network.netty.NettyServer
 - Successful initialization (took 55 ms). Listening on SocketAddress 
/172.19.115.52:41920http://172.19.115.52:41920.

09:56:29,890 INFO  org.apache.flink.yarn.YarnTaskManager
 - Determined BLOB server address to be 
/172.19.115.51:38505http://172.19.115.51:38505. Starting BLOB cache.

09:56:29,893 INFO  org.apache.flink.runtime.blob.BlobCache  
 - Created BLOB cache storage directory 
/tmp/blobStore-7150f7d7-f7a3-4c4c-9cda-3877da5aacd6

09:56:52,367 INFO  org.apache.flink.yarn.YarnTaskManager
 - Received task CHAIN DataSource (at 
createInput(ExecutionEnvironment.java:502) 
(org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap 
(FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (1/3)

09:56:52,375 INFO  org.apache.flink.yarn.YarnTaskManager
 - Received task CHAIN DataSource (at 
createInput(ExecutionEnvironment.java:502) 
(org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap 
(FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (3/3)

09:56:52,383 INFO  org.apache.flink.runtime.taskmanager.Task
 - Loading JAR files for task CHAIN DataSource (at 
createInput(ExecutionEnvironment.java:502) 
(org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap 
(FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (1/3)

09:56:52,387 INFO  org.apache.flink.runtime.taskmanager.Task
 - Loading JAR files for task CHAIN DataSource (at 
createInput(ExecutionEnvironment.java:502) 
(org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap 
(FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (3/3)

09:56:52,394 INFO  org.apache.flink.yarn.YarnTaskManager
 - Received task CHAIN DataSource (at 
createInput(ExecutionEnvironment.java:502) 
(org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap 
(FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (1/3)

09:56:52,402 INFO  org.apache.flink.runtime.taskmanager.Task
 - Loading JAR files for task CHAIN DataSource (at 
createInput(ExecutionEnvironment.java:502) 
(org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap 
(FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (1/3)

09:56:52,425 INFO  org.apache.flink.yarn.YarnTaskManager
 - Received task CHAIN DataSource (at 
createInput(ExecutionEnvironment.java:502) 
(org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap 
(FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (2/3)

09:56:52,429 INFO  org.apache.flink.runtime.taskmanager.Task
 - Loading JAR files for task CHAIN DataSource (at 
createInput(ExecutionEnvironment.java:502) 
(org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - 

Re: Flink YARN Client requested shutdown in flink -m yarn-cluster mode?

2015-08-28 Thread Robert Metzger
Hi,
no problem. The behavior is not documented and I also needed some time to
figure this out ;)

I'm already preparing a pull request to add a note into the documentation.

On Fri, Aug 28, 2015 at 4:41 PM, LINZ, Arnaud al...@bouyguestelecom.fr
wrote:

 Hi Robert,



 As seen together, my mistake was to launch the job in detached mode (-yd)
 when my main function was not waiting after execution and was immediately
 ending. Sorry for my misunderstanding of this option.



 Best regards,

 Arnaud



 *De :* Robert Metzger [mailto:rmetz...@apache.org]
 *Envoyé :* vendredi 28 août 2015 11:03
 *À :* user@flink.apache.org
 *Objet :* Re: Flink YARN Client requested shutdown in flink -m
 yarn-cluster mode?



 Is the log from 0.9-SNAPSHOT or 0.10-SNAPSHOT?



 Can you send me (if you want privately as well) the full log of the yarn
 application:



 yarn logs -applicationId appId.



 We need to find out why the TaskManagers are shutting down. That is most
 likely logged in the TaskManager logs.





 On Fri, Aug 28, 2015 at 10:57 AM, LINZ, Arnaud al...@bouyguestelecom.fr
 wrote:

 Hello,



 I’ve moved my version from 0.9.0 and tried both 0.9-SNAPSHOT 
 0.10-SNAPSHOT to continue my batch execution on my secured cluster thanks
 to [FLINK-2555].

 My application works nicely in local mode and also in yarn mode using a
 job container started with yarn-session.sh, but it fails in –m
 yarn-cluster mode



 Yarn logs indicate that  “Flink YARN Client requested shutdown” but I did
 nothing like that (or not intentionally). The nodes are not even starting
 and the exec() does not return any JobExecutionResult.



 My command line was :

 flink run -m yarn-cluster -yd -yn 2 -ytm 1500 -yqu default -ys 4 --class
 myMainClass myJar some options



 Any idea what I’ve done wrong?



 Greetings,

 Arnaud



 PS - Yarn log extract :

 (…)

 09:56:29,111 INFO
 org.apache.flink.yarn.YarnTaskManager - Successful
 registration at JobManager (akka.tcp://
 flink@172.19.115.51:54806/user/jobmanager), starting network stack and
 library cache.

 09:56:29,817 INFO
 org.apache.flink.runtime.io.network.netty.NettyClient - Successful
 initialization (took 73 ms).

 09:56:29,889 INFO
 org.apache.flink.runtime.io.network.netty.NettyServer - Successful
 initialization (took 55 ms). Listening on SocketAddress /
 172.19.115.52:41920.

 09:56:29,890 INFO
 org.apache.flink.yarn.YarnTaskManager - Determined
 BLOB server address to be /172.19.115.51:38505. Starting BLOB cache.

 09:56:29,893 INFO
 org.apache.flink.runtime.blob.BlobCache   - Created
 BLOB cache storage directory
 /tmp/blobStore-7150f7d7-f7a3-4c4c-9cda-3877da5aacd6

 09:56:52,367 INFO
 org.apache.flink.yarn.YarnTaskManager - Received
 task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (1/3)

 09:56:52,375 INFO
 org.apache.flink.yarn.YarnTaskManager - Received
 task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (3/3)

 09:56:52,383 INFO  org.apache.flink.runtime.taskmanager.Task
 - Loading JAR files for task CHAIN DataSource (at
 createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (1/3)

 09:56:52,387 INFO
 org.apache.flink.runtime.taskmanager.Task - Loading JAR
 files for task CHAIN DataSource (at
 createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 1) (3/3)

 09:56:52,394 INFO
 org.apache.flink.yarn.YarnTaskManager - Received
 task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (1/3)

 09:56:52,402 INFO
 org.apache.flink.runtime.taskmanager.Task - Loading JAR
 files for task CHAIN DataSource (at
 createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (1/3)

 09:56:52,425 INFO
 org.apache.flink.yarn.YarnTaskManager - Received
 task CHAIN DataSource (at createInput(ExecutionEnvironment.java:502)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) - FlatMap
 (FlatMap at readTable(HiveDAO.java:107)) - Map (Key Extractor 2) (2/3)

 09:56:52,429 INFO
 org.apache.flink.runtime.taskmanager.Task   

Re: Event time in Flink streaming

2015-08-28 Thread Aljoscha Krettek
Hi Martin,
the answer depends, because the current windowing implementation has some
problems. We are working on improving it in the 0.10 release, though.

If your elements arrive with strictly increasing timestamps and you have
parallelism=1 or don't perform any re-partitioning of data (which a
groupBy() does, for example) then what Matthias proposed works for you. If
not then you can get intro problems with out-of-order elements and windows
will be incorrectly determined.

If you are interested in what we are working on for 0.10, please look at
the design documents here
https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
and
here
https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams.
The basic idea is to make windows work correctly when elements arrive not
ordered by timestamps. For this we want use watermarks as popularized, for
example, by Google Dataflow.

Please ask if you have questions about this or are interested in joining
the discussion (the design as not yet finalized, both API and
implementation). :D

Cheers,
Aljoscha

P.S. I have some proof-of-concept work in a branch of mine, if you
interested in my work there I could give you access to it.

On Fri, 28 Aug 2015 at 16:11 Matthias J. Sax mj...@informatik.hu-berlin.de
wrote:

 Hi Martin,

 you need to implement you own policy. However, this should be be
 complicated. Have a look at TimeTriggerPolicy. You just need to
 provide a Timestamp implementation that extracts you ts-attribute from
 the tuples.

 -Matthias

 On 08/28/2015 03:58 PM, Martin Neumann wrote:
  Hej,
 
  I have a stream of timestamped events I want to process in Flink
 streaming.
  Di I have to write my own policies to do so, or can define time based
  windows to use the timestamps instead of the system time?
 
  cheers Martin




Re: Event time in Flink streaming

2015-08-28 Thread Martin Neumann
The stream consists of logs from different machines with synchronized
clocks. As a result timestamps are not strictly increasing but there is a
bound on how much out of order they can be. (One aim is to detect events go
out of order more then a certain amount indication some problem in the
system setup)

I will look at the example policies and see if I can find a way to make it
work with 0.9.

I am aware of Google Dataflow and the discussion on Flink, though I just
recently learned more about the field, so I didn't have to much useful to
say. This might change if I get some more experience with the usecase I'm
working on.

cheers Martin

On Fri, Aug 28, 2015 at 5:06 PM, Aljoscha Krettek aljos...@apache.org
wrote:

 Hi Martin,
 the answer depends, because the current windowing implementation has some
 problems. We are working on improving it in the 0.10 release, though.

 If your elements arrive with strictly increasing timestamps and you have
 parallelism=1 or don't perform any re-partitioning of data (which a
 groupBy() does, for example) then what Matthias proposed works for you. If
 not then you can get intro problems with out-of-order elements and windows
 will be incorrectly determined.

 If you are interested in what we are working on for 0.10, please look at
 the design documents here
 https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
  and
 here
 https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams.
 The basic idea is to make windows work correctly when elements arrive not
 ordered by timestamps. For this we want use watermarks as popularized, for
 example, by Google Dataflow.

 Please ask if you have questions about this or are interested in joining
 the discussion (the design as not yet finalized, both API and
 implementation). :D

 Cheers,
 Aljoscha

 P.S. I have some proof-of-concept work in a branch of mine, if you
 interested in my work there I could give you access to it.

 On Fri, 28 Aug 2015 at 16:11 Matthias J. Sax 
 mj...@informatik.hu-berlin.de wrote:

 Hi Martin,

 you need to implement you own policy. However, this should be be
 complicated. Have a look at TimeTriggerPolicy. You just need to
 provide a Timestamp implementation that extracts you ts-attribute from
 the tuples.

 -Matthias

 On 08/28/2015 03:58 PM, Martin Neumann wrote:
  Hej,
 
  I have a stream of timestamped events I want to process in Flink
 streaming.
  Di I have to write my own policies to do so, or can define time based
  windows to use the timestamps instead of the system time?
 
  cheers Martin




Re: Best way for simple logging in jobs?

2015-08-28 Thread Robert Metzger
Hi,

Creating a slf4j logger like this:

private static final Logger LOG =
LoggerFactory.getLogger(PimpedKafkaSink.class);

Works for me. The messages also end up in the regular YARN logs. Also
system out should end up in YARN actually (when retrieving the logs from
the log aggregation).

Regards,

Robert


On Fri, Aug 28, 2015 at 3:55 PM, LINZ, Arnaud al...@bouyguestelecom.fr
wrote:

 Hi,



 I am wondering if it’s possible to get my own logs inside the job
 functions (sources, mappers, sinks…).  It would be nice if I could get
 those logs in the Yarn’s logs, but writing System.out/System.err has no
 effect.



 For now I’m using a “StringBuffer” accumulator but it does not work in
 streaming apps before v0.10, and only show results at the end.



 I’ll probably end up using a HDFS logging system but there is maybe a
 smarter way ?



 Greetings,

 Arnaud



 --

 L'intégrité de ce message n'étant pas assurée sur internet, la société
 expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
 jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
 vous n'êtes pas destinataire de ce message, merci de le détruire et
 d'avertir l'expéditeur.

 The integrity of this message cannot be guaranteed on the Internet. The
 company that sent this message cannot therefore be held liable for its
 content nor attachments. Any unauthorized use or dissemination is
 prohibited. If you are not the intended recipient of this message, then
 please delete it and notify the sender.