SQL Do Not Support Custom Trigger

2018-06-21 Thread YennieChen88
I found that flink SQL use the specific default trigger, which will not triggered until the window closes. But sometimes, we need to trigger before window closes. As the class *WindowAssigner *provides method *getDefaultTrigger *with parameter *StreamExecutionEnvironment*, how about passing a

Re: Need a map-like state in an operator state

2018-06-21 Thread xsheng
Solved it by using a key selector that returns a constant, so that creates a "pseudo" keyedStream with only one partition. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Need a map-like state in an operator state

2018-06-21 Thread xsheng
Hi All, I'm sorry if I'm double posting, but I posted it before subscribing and I don't see it in my post lists. So I'm posting it again. The Flink app we are trying to build is as such: read from kafka, sort the messages according to some dependency rules, and only send messages that have

Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Garrett Barton
Actually, random thought, could yarn preemption be causing this? What is the failure scenario should a working task manager go down in yarn that is doing real work? The docs make it sound like it should fire up another TM and get back to work out of the box, but I'm not seeing that. On Thu,

Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Garrett Barton
Thank you all for the reply! I am running batch jobs, I read in a handful of files from HDFS and output to HBase, HDFS, and Kafka. I run into this when I have partial usage of the cluster as the job runs. So right now I spin up 20 nodes with 3 slots, my job at peak uses all 60 slots, but by the

Re: How to set log level using Flink Docker Image

2018-06-21 Thread Guilherme Nobre
Thanks, that sounds reasonable. I'll try it out tomorrow :) Cheers, G On Thu, Jun 21, 2018 at 6:56 PM Dominik Wosiński wrote: > You can for example mount the *conf* directory using docker volumes. > > You would need to have *logback.xml* and then mount it as *conf/logback.xml > *inside the

Re: How to set log level using Flink Docker Image

2018-06-21 Thread Dominik Wosiński
You can for example mount the *conf* directory using docker volumes. You would need to have *logback.xml* and then mount it as *conf/logback.xml *inside the image. Locally You could do this using *docker-compose.yml*, for mounting volumes in kubernetes refer to this page:

How to set log level using Flink Docker Image

2018-06-21 Thread Guilherme Nobre
Hi everyone, I have a Flink Cluster created from Flink's Docker Image. I'm trying to set log level to DEBUG but couldn't find how. I know where logback configuration files are as per documentation: "The conf directory contains a logback.xml file which can be modified and is used if Flink is

Re: # of active session windows of a streaming job

2018-06-21 Thread Dongwon Kim
Hi Fabian and Chesnay, Thank you guys. Fabian : Unfortunately, as Chesnay said, MetricGroup doesn't allow for ProcessWindowFunction to access to a counter defined in Trigger. Chesnay : I'm going to follow your advice on how to modify Flink. Thank you very much! Best, - Dongwon On Thu, Jun 21,

Re: Backpressure from producer with flink connector kinesis 1.4.2

2018-06-21 Thread Liu, Gavin (CAI - Atlanta)
Thanks, Gordon. Glad to hear you confirm on this. I learned a lot from the open pr btw. I wonder except adding back pressure support in the producer, is there any other way to protect yarn from crashing, e.g., through configuration? From: "Tzu-Li (Gordon) Tai" Date: Thursday, June 21, 2018 at

Re: # of active session windows of a streaming job

2018-06-21 Thread Chesnay Schepler
Without modifications to Flink? No. By design nothing can intercept or retrieve metrics with the metrics API. For this pattern the usual recommendation is to explicitly pass the metric to components that require it. If modifications are an option, what you could do is * define a counter in the

Re: How to use broadcast variables in data stream

2018-06-21 Thread Fabian Hueske
That's correct. Broadcast state was added with Flink 1.5. You can use DataStream.broadcast() in Flink 1.3 but it has a few limitations. For example, you cannot connect a keyed and a broadcasted stream. 2018-06-21 11:58 GMT+02:00 zhen li : > Thanks for your reply. > But broadcast state seems

Re: Flink 1.5 TooLongFrameException in cluster mode?

2018-06-21 Thread Chesnay Schepler
Due to the job-submission changes in 1.5 you attempted to send REST requests to akka, which causes the exception you received. Instead you want to specify the REST port instead, which by default is 8081 (configurable via rest.port). On 21.06.2018 10:44, chrisr123 wrote: This looks related

Re: How to use broadcast variables in data stream

2018-06-21 Thread zhen li
Thanks for your reply. But broadcast state seems not supported in Flink-1.3 . I found this in Flink-1.3: Broadcasting DataStream → DataStream Broadcasts elements to every partition. dataStream.broadcast(); But I don’t know how to convert it to list and get it in stream context . 在

Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Till Rohrmann
The reason why you still have to do it is because we still have to support the legacy mode where the client needs to know the JobManager RPC address. Once we remove the legacy mode, we could change the HighAvailabilityServices such that we have client facing HA services which only retrieve the

Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Sampath Bhat
hi Yes I've specified the rest.address for the flink client to connect to the rest.address and the rest.address is valid and working fine but my question is why am I supposed to give jobmanager.rpc.address for flink client to connect to flink cluster if flink client depends only on rest.address?

Re: How to use broadcast variables in data stream

2018-06-21 Thread Fabian Hueske
Hi, if the list is static and not too large, you can pass it as a parameter to the function. Function objects are serialized (using Java's default serialization) and shipped to the workers for execution. If the data is dynamic, you might want to have a look at Broadcast state [1]. Best, Fabian

Re: # of active session windows of a streaming job

2018-06-21 Thread Fabian Hueske
Hi Dongwon, Yes, the counter state should be stored in operator state which is not available on Triggers. Chesnay: Can a window function (like ProcessWindowFunction) access (read, write) the counter of its associated Trigger to checkpoint and restore it? Best, Fabian 2018-06-20 16:59 GMT+02:00

Re: Backpressure from producer with flink connector kinesis 1.4.2

2018-06-21 Thread Tzu-Li (Gordon) Tai
Hi, According to the description in [1], then yes, I think it is expected that eventually YARN containers running TMs that execute the producer sink subtasks will be killed due to memory problems. It seems like that KPL client is only a wrapper around a C++ daemon process, so it actually

Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Till Rohrmann
Hi Garrett, killing of idle TaskManager should not affect the execution of the job. By definition a TaskManager only idles if it does not execute any tasks. Could you maybe share the complete logs (of the cluster entrypoint and all TaskManagers) with us? Cheers, Till On Thu, Jun 21, 2018 at

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread Siew Wai Yow
Thanks @SihuaZhou, you are right that this is a bug. Just check the source code too. @Chesnay Schepler, Tested with both checkpoint and savepoint at same File system and it is working as expected. Thanks guys! -Yow From: Chesnay

Re: Flink 1.5 TooLongFrameException in cluster mode?

2018-06-21 Thread chrisr123
This looks related to using the -m option on CLI: This works: $FLINK_HOME/bin/flink run -c $CLASS $JARFILE but this causes the error: $FLINK_HOME/bin/flink run -m jobmanagermachine:6123 -c $CLASS $JARFILE I found this thread here from April 27

Re: A question about Kryo and Window State

2018-06-21 Thread Tzu-Li (Gordon) Tai
Hi Vishal, Kryo has a serializer called `CompatibleFieldSerializer` that allows for simple backward compatibility changes, such as adding non-optional fields / removing fields. If using the KryoSerializer is a must, then a good thing to do is to register Kryo's `CompatibleFieldSerializer` as

Re: Debug job execution from savepoint

2018-06-21 Thread Fabian Hueske
Hi Manuel, I had a look and couldn't find a way to do it. However, this sounds like a very useful feature to me. Would you mind creating a Jira issue [1] for that? Thanks, Fabian [1] https://issues.apache.org/jira/projects/FLINK 2018-06-18 16:23 GMT+02:00 Haddadi Manuel : > Hi all, > > > I

Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Fabian Hueske
Hi Garrett, I agree, there seems to be an issue and increasing the timeout should not be the right approach to solve it. Are you running streaming or batch jobs, i.e., do some of the tasks finish much earlier than others? I'm adding Till to this thread who's very familiar with scheduling and

Re: Control insert database with dataset

2018-06-21 Thread Fabian Hueske
Hi Dulce, This functionality is not supported by the JDBCOutputFormat. Some database systems (AFAIK, MySQL) support Upsert writes, i.e., writes that insert if the primary key is not present or update the row if the PK exists. Not sure if that would meet your requirements. If you don't want to go

Re: A question about Kryo and Window State

2018-06-21 Thread Fabian Hueske
Hi Vishal, In general, Kryo serializers are not very upgrade friendly. Serializer compatibility [1] might be right approach here, but Gordon (in CC) might know more about this. Best, Fabian [1]

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
Thank you for the clarification. On Thu, Jun 21, 2018 at 1:36 PM sihua zhou wrote: > Yes, you can clear the state for a key(the currently active key), if you > clear it, it means that you have also cleaned it from the state backend, > and the future checpoints won't contains the key anymore

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread sihua zhou
Yes, you can clear the state for a key(the currently active key), if you clear it, it means that you have also cleaned it from the state backend, and the future checpoints won't contains the key anymore unless you add it again. Best, Sihua On 06/21/2018 16:04,Garvit Sharma wrote: Now,

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
Now, after clearing state for a key, I don't want that redundant data in the state backend. This is my concern. Please let me know if there are any gaps. Thanks, On Thu, Jun 21, 2018 at 1:31 PM Garvit Sharma wrote: > I am maintaining state data for a key in ValueState. As per [0] I can >

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
I am maintaining state data for a key in ValueState. As per [0] I can clear() state for that key. [0] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/state.html Please let me know. Thanks, On Thu, Jun 21, 2018 at 1:19 PM sihua zhou wrote: > Hi Garvit, > > Let's

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread sihua zhou
Hi Garvit, Let's say you clearing the state at timestamp t1, then the checkpoints completed before t1 will still contains the data you cleared. But the future checkpoints won't contain the cleared data again. But I'm not sure what you meaning by the cleared state, you can only clear a

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
So, would it delete all the files in HDFS associated with the cleared state? On Thu, Jun 21, 2018 at 12:58 PM sihua zhou wrote: > Hi Garvit, > > > Now, let's say, we clear the state. Would the state data be removed from > HDFS too? > > The state data would not be removed from HDFS immediately,

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread Chesnay Schepler
If you could open a JIRA this would be great. On 21.06.2018 09:07, sihua zhou wrote: Hi Yow, I had a look at the related code, I think this seems like a bug. Flink use Checkpoint path's FileSystem to create the output stream for the Savepoint, but in your case the checkpoint & savepoint are

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread sihua zhou
I just create a JIRA for this: https://issues.apache.org/jira/browse/FLINK-9633 On 06/21/2018 15:10,Chesnay Schepler wrote: That's quite weird that it tries to us the local file-system. Maybe it derives the FS from state.backend.fs.checckpointdir, but uses it for savepoints.dir? What happens

Re:Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread sihua zhou
Hi Garvit, > Now, let's say, we clear the state. Would the state data be removed from HDFS > too? The state data would not be removed from HDFS immediately, if you clear the state in your job. But after you clearing the state in your job, the later completed checkpoint won't contain the

Re: Strictly use TLSv1.2

2018-06-21 Thread Vinay Patil
Hi, I have deployed Flink 1.3.2 and enabled SSL settings. From the ssl debug logs it shows that Flink is using TLSv1.2. However based on the security scans we have observed that it also allows TLSv1.0 and TLSv1.1. In order to strictly use TLSv1.2 we have updated the following property of

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread sihua zhou
Hi Yow, I had a look at the related code, I think this seems like a bug. Flink use Checkpoint path's FileSystem to create the output stream for the Savepoint, but in your case the checkpoint & savepoint are not using the same file system. A workaround is to use the same file system for both

Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Till Rohrmann
Hi, if the rest.address is different from the jobmanager.rpc.address, then you should specify that in the flink-conf.yaml and Flink will connect to rest.address. Only if rest.address is not specified, the system will fall back to use the jobmanager.rpc.address. Currently, the rest server endpoint

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread Chesnay Schepler
That's quite weird that it tries to us the local file-system. Maybe it derives the FS from state.backend.fs.checckpointdir, but uses it for savepoints.dir? What happens if you set state.backend.fs.checkpointdir also to HDFS? On 21.06.2018 08:07, Siew Wai Yow wrote: Thanks Chesnay, the

Re: Strictly use TLSv1.2

2018-06-21 Thread Vinay Patil
Hi, Can someone please help me with this issue. Regards, Vinay Patil -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Sampath Bhat
Hello Till Thanks for clarification. But I've few questions based on your reply. In non-HA setups we need the jobmanager.rpc.address to derive the hostname of the rest server. why is there dependency on jobmanager.rpc.address to get the hostname rest server? This holds good only for normal

[no subject]

2018-06-21 Thread Vinod Gavhane
Regards, Vinod Gavhane

Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
Hi, Consider a managed keyed state backed by HDFS with checkpointing enabled. Now, as the state grows the state data will be saved on HDFS. Now, let's say, we clear the state. Would the state data be removed from HDFS too? How does Flink manage to clear the state data from state backend on

Re: Blob Server Removes Failed Jobs Immediately

2018-06-21 Thread Till Rohrmann
Hi Dominik, all job related files (non-HA as well as HA) are removed once the job reaches a globally terminal state (FINISHED, CANCELLED, FAILED). This is the case because Flink assumes that the job is done and won't be retried afterwards. Thus, the documentation in the Flip is not true and

Re: Exception while submitting jobs through Yarn

2018-06-21 Thread Till Rohrmann
Great to hear. Please open a PR for the improvements you like to contribute. Cheers, Till On Wed, Jun 20, 2018 at 4:56 PM Garvit Sharma wrote: > So, finally, I have got this working. The issue was because of a poor > library which was using xerces 2.6 :). > > In this process, I found few

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread Siew Wai Yow
Thanks Chesnay, the application will take value from "state.savepoints.dir" as default if set target-directory to nul. But then it trying to create the directory in local machine, which caused the below error because it is a HDFS directory. The same URL works in previous Flink 1.3.2. Is