Flink doesn't free YARN slots after restarting

2017-08-09 Thread Bowen Li
Hi guys, I was running a Flink job (12 parallelism) on an EMR cluster with 48 YARN slots. When the job starts, I can see from Flink UI that the job took 12 slots, and 36 slots were left available. I would expect that when the job fails, it would restart from checkpointing by taking another

Different watermarks on keyed stream

2017-08-09 Thread Björn Hedström
Hi, I'm building a small application which reads CSV-files from Kafka and passes them to Flink. In Flink i parse these values which contains an embedded timestamp and assign watermarks with a BoundedOutOfOrdernessTimestampExtractor. Then i "transform" the stream into a KeyedStream by an ID embedde

Re: Advice on debugging state back end...

2017-08-09 Thread Stefan Richter
Hi, the amount of heap space that the backend consumes can highly depend on what your job is doing and the datatypes you are storing. First, if your job has many windows (e.g. using sliding windows can blow up the amount of windows Flink has to track) or huge windows (e.g. because of long time

Re: WaterMark & Eventwindow not fired correctly

2017-08-09 Thread aitozi
Hi, Bellow is my code splitStream.select(duringTime + "") .map(new KeyMapFunc()) .assignTimestampsAndWatermarks(new DelaySaltWatermarks()) .setParallelism(300) .keyBy(_SQL, _KEY, _SALT) .window(TumblingEventTimeWindo

Re: Different watermarks on keyed stream

2017-08-09 Thread Till Rohrmann
Hi Björn, unfortunately Flink does not support per key watermarks. Watermarks are always global. One way to solve this problem would be to split your input data up into disjunct pieces where each piece only contains data for one key. You could do this either by creating new Kafka topics or by spl

Re: Flink doesn't free YARN slots after restarting

2017-08-09 Thread Till Rohrmann
Hi Bowen, if I'm not mistaken, then Flink's current Yarn implementation does not actively releases containers. The `YarnFlinkResourceManager` is started with a fixed number of containers it always tries to acquire. If a container should die, then it will request a new one. In case of a failure al

Re: Task slot data

2017-08-09 Thread Till Rohrmann
Hi Govindarajan, evert task is executed by exactly one thread. Thus, it should be fine to use Thread local variables. However be aware that a single Task can execute multiple operators if they are chained. Cheers, Till On Wed, Aug 9, 2017 at 6:49 AM, Govindarajan Srinivasaraghavan < govindragh..

Re: Getting JobManager address and port within a running job

2017-08-09 Thread Aljoscha Krettek
Hi, How are you specifying the port? Because in YARN mode the config setting is different: yarn.application-master.port. Also, you cannot specify the IP of the JobManager in YARN mode because Flink does not have any influence in how YARN schedules the JobManager so it could run on any machine i

Re: Can't find correct JobManager address, job fails with Queryable state

2017-08-09 Thread Aljoscha Krettek
Hi, I'm afraid you are running into roughly this problem: https://issues.apache.org/jira/browse/FLINK-6689 It's not possible anymore to connect to a LocalFlinkMiniCluster, either via a RemoteExecutionEnvironment or via the queryable state clie

Re: Getting JobManager address and port within a running job

2017-08-09 Thread Biplob Biswas
Hi Aljoscha, I was expecting that I could set the jobmanager address and port by setting it up in the configuration and passing it to the execution environment, but learnt later that it was a wrong approach. My motivation of accessing the jobmanager coordinates was to setup a queryablestateclien

Re: Can't find correct JobManager address, job fails with Queryable state

2017-08-09 Thread Biplob Biswas
Thanks Aljoscha, this clarification probably ends my search of accessing local states from within the same job. Thanks for the help :) -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-find-correct-JobManager-address-job-fails-with-Que

Re: Operations dependencies between values with different key in a ConnectedStreams

2017-08-09 Thread Aljoscha Krettek
Hi Gabriele, Yes, something like this is possible with Flink. However, you have to implement a two-stage approach for this that I would roughly call "scatter-gather". You have three operators: input -> Scatter -> State -> Gather -> output Where the "Scatter" analyses the what state you need fo

Re: Getting JobManager address and port within a running job

2017-08-09 Thread Aljoscha Krettek
Hi, I don't think Queryable State is good for that use case because it can lead to inconsistencies. However, I think my answer here and the linked presentation might be helpful for your use case: https://lists.apache.org/thread.html/3907cd6433c9c066126ce150cb2fdcc298d366eb55bca890be716020@%3Cus

Re: WaterMark & Eventwindow not fired correctly

2017-08-09 Thread Aljoscha Krettek
Hi, So when the parallelism of the timestamp assigner is different from the parallelism of the map(KeyMapFunc()) or the window then it works? But when the parallelism is the same it does not work? If this is true, then I would assume, that some parallel instances of the timestamp assigner don'

Re: Writing on Cassandra

2017-08-09 Thread Aljoscha Krettek
Hi, In the doc section about Cassandra there is actually an example: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html . In a Flink Job you would therefore roughly

Re: Logback user class

2017-08-09 Thread Aljoscha Krettek
Hi, It seems there is a bug in the internal Table API operators: they store the Logger that is available on the Client machine and deserialising that Logger fails on the cluster. I created a Jira issue for this: https://issues.apache.org/jira/browse/FLINK-7398

Re: Use processing time and time window flink acts like batch mode.

2017-08-09 Thread Aljoscha Krettek
Hi, Could you please clarify what is the expected and the actual output? Also, could you maybe post the snipped that produces the log output? Best, Aljoscha > On 12. Jul 2017, at 15:32, yunfan123 wrote: > > I using processing time and the data source comes from kafka. > My code is like follow

Re: Flink - Handling late events - main vs late window firing

2017-08-09 Thread Aljoscha Krettek
Hi, 1. You could use a ProcessWindowFunction instead of a WindowFunction. In there, you can query the current watermark and thus determine why the firing is happening. Also, in a ProcessWindowFunction you can keep per-window state, this would allow you to keep a bit of state that can tell you w

Re: queryable state vs. writing result back to Kafka

2017-08-09 Thread Aljoscha Krettek
One advantage is that you don't need an external system for storing results if they are readily available in Flink. There are also downsides, however. For example, queryable state is not available if the Job is suspended or restarting. Generally, I think this depends on the use case. Best, Aljo

Re: k8s FileNotFoundException

2017-08-09 Thread Aljoscha Krettek
Is the file also available on the TaskManager containers? These are doing the actual reading, in fact. In general, I would suggest to use a distributed file system as input for a Flink Job if you want to process files. Best, Aljoscha > On 4. Aug 2017, at 14:13, Kaepke, Marc wrote: > > Hi ever

Re: Getting JobManager address and port within a running job

2017-08-09 Thread Biplob Biswas
Hi Aljoscha, Thanks for the link. I read through it but I still can't imagine implementing something similar for my usecase. I explained my usecase to Fabian in a previous post, I would try to be again as clear as possible. So, my use case, in short, is something like below: 1) From input str

Re: Using Hadoop 2.8.0 in Flink Project for S3A Path Style Access

2017-08-09 Thread Aljoscha Krettek
So you're saying that this works if you manually compile Flink for Hadoop 2.8.0? If yes, I think the solution is that we have to provide binaries for Hadoop 2.8.0. If we did that with a possible Flink 1.3.3 release and starting from Flink 1.4.0, would this be an option for you? Best, Aljoscha >

Re: Watermarking and Timestamp on Kafka stream union

2017-08-09 Thread Aljoscha Krettek
Hi, You would apply a assignTimestampsAndWatermarks() operation on the unioned stream. However, this would not allow you to sort the join by timestamp, since no sorting is supported out-of-box in Flink. Best, Aljoscha > On 26. Jul 2017, at 21:37, G.S.Vijay Raajaa wrote: > > HI, > > I am hav

Re: Can i use lot of keyd states or should i use 1 big key state.

2017-08-09 Thread Aljoscha Krettek
Hi, If you have one keyed state, say "count by email id", and many different keys you will only have one column in RocksDB (or one HashTable). Actually, a lot of users have hundreds of millions of different keys for some states. Best, Aljoscha > On 2. Aug 2017, at 14:59, shashank agarwal wrot

Flink isn't logging once we use RollingFileAppender

2017-08-09 Thread Hussein Baghdadi
Hello, We changed our log4j.properties file of our Flink cluster to: log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=/opt/flink/log/flink.log log4j.appender.file.MaxFileSize=100MB log4j.appender.file.MaxBackupIndex=2 log4j.appender.file.layout=org.apache.log4j.Pa

Re: Getting JobManager address and port within a running job

2017-08-09 Thread Aljoscha Krettek
Ok, I saw that one. Unfortunately it's still not clear to me how that would work. Could you maybe highlight an actual flow of events and describe what will (or should) happen? Describe the current state of the system, i.e. what state is there, then describe what happens when events with given ke

Queryable State max number of clients

2017-08-09 Thread Ziyad Muhammed
Hi all, I'm trying to understand how many parallel clients will be supported by the queryable state. - query.server.network-threads: number of network (event loop) threads for the KvStateServer (0 => #slots) - query.server.query-threads: number of asynchronous query threads for the K

Evolving serializers and impact on flink managed states

2017-08-09 Thread Biplob Biswas
Hi, We have a set of XSDs which define our schema for the data and we are using the corresponding POJO's for serialization with Flink Serialization stack. Now, I was concerned about any evolution of our XSD schema which will subsequently change the generated POJO's which in turn are used for crea

Re: Evolving serializers and impact on flink managed states

2017-08-09 Thread Stefan Richter
Hi, > 1. The article in question probably makes use of Flink serialization, what > if I use Avro serde for the serialization and deserialization part. If I > create a savepoint of my job, stop my flink, load the new POJO and continue > from the savepoint, would avro's schema evolution feature perf

Re: Experiencing long latency while using sockets

2017-08-09 Thread Chao Wang
Thank you, Fabian. Maybe there's also some buffers sit between data source and the first operator? I observed that in my implementation of SourceFunction (using a socket server, as listed in the previous email), for receiving two messages, in terms of event time, it took 0.2 ms before the Sou

Re: Getting JobManager address and port within a running job

2017-08-09 Thread Biplob Biswas
Hi Aljoscha, So basically, I am reading events from a kafka topic. These events have corresponding eventIds and a list of modes. *Job1* 1. When I first read from the kafka topic, I key by the eventId's and use a processfuntion to create a state named "events". Also, the list of modes are used to

Naming operators to reflect in UI

2017-08-09 Thread Raja . Aravapalli
Hi, Can someone please let me know, if I can name the operators, so that the naming reflects in UI. Right now, I am observing in UI that, only Source: Custom Source Sink: Unnamed Please advise. Thank you. Regards, Raja.

Re: Naming operators to reflect in UI

2017-08-09 Thread Ziyad Muhammed
Hi, You can set the name of any operator explicitly by calling .name(String customName)on the operator. For details: https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/api/java/op

Re: Using Hadoop 2.8.0 in Flink Project for S3A Path Style Access

2017-08-09 Thread Eron Wright
For reference: [FLINK-6466] Build Hadoop 2.8.0 convenience binaries On Wed, Aug 9, 2017 at 6:41 AM, Aljoscha Krettek wrote: > So you're saying that this works if you manually compile Flink for Hadoop > 2.8.0? If yes, I think the solution is that we have to provide binaries for > Hadoop 2.8.0. If

Re: [EXTERNAL] Re: Naming operators to reflect in UI

2017-08-09 Thread Raja . Aravapalli
Thanks Ziyad. Will check that. Regards, Raja. From: Ziyad Muhammed Date: Wednesday, August 9, 2017 at 12:52 PM To: Raja Aravapalli Cc: "user@flink.apache.org" Subject: [EXTERNAL] Re: Naming operators to reflect in UI Hi, You can set the name of any operator explicitly by calling .name(Str

Re: Flink doesn't free YARN slots after restarting

2017-08-09 Thread Bowen Li
Hi Till, Thanks for taking this issue. We are not comfortable sending logs to a email list which is this open. I'll send logs to you. Thanks, Bowen On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann wrote: > Hi Bowen, > > if I'm not mistaken, then Flink's current Yarn implementation does n

Re: Experiencing long latency while using sockets

2017-08-09 Thread Chao Wang
It seems that the observed long latencies were due to certain one-time internal mechanism that only occurred after Flink has received the first message. Based on my measurement that mechanism took around 100 ms. Now I setup my application the following way, and I observed that the end-to-end l

difference between checkpoints & savepoints

2017-08-09 Thread Raja . Aravapalli
Hi, Can someone please help me understand the difference between Flink's Checkpoints & Savepoints. While I read the documentation, couldn't understand the difference! :s Thanks a lot. Regards, Raja.