Re: [DISCUSS] Create a Flink ecosystem website

2019-04-24 Thread Becket Qin
Thanks for the update, Robert. Looking forward to the website. If there is
already a list of software we need to run the website, we can ask Apache
infra team to prepare the VM for us, as that may also take some time.

On Wed, Apr 24, 2019 at 11:57 PM Robert Metzger  wrote:

> Hey all,
>
> quick update on this project: The frontend and backend code have been put
> together into this repository: https://github.com/sorahn/flink-ecosystem
> We also just agreed on an API specification, and will now work on
> finishing the backend.
>
> It will probably take a few more weeks for this to finish, but we are
> making progress :)
>
> Best,
> Robert
>
>
> On Mon, Apr 15, 2019 at 11:18 AM Robert Metzger 
> wrote:
>
>> Hey Daryl,
>>
>> thanks a lot for posting a link to this first prototype on the mailing
>> list! I really like it!
>>
>> Becket: Our plan forward is that Congxian is implementing the backend for
>> the website. He has already started with the work, but needs at least one
>> more week.
>>
>>
>> [Re-sending this email because the first one was blocked on dev@f.a.o]
>>
>>
>> On Mon, Apr 15, 2019 at 7:59 AM Becket Qin  wrote:
>>
>>> Hi Daryl,
>>>
>>> Thanks a lot for the update. The site looks awesome! This is a great
>>> progress. I really like the conciseness of GUI.
>>>
>>> One minor suggestion is that for the same library, there might be
>>> multiple versions compatible with different Flink versions. It would be
>>> good to show that somewhere in the project page as it seems important to
>>> the users.
>>>
>>> BTW, will you share the plan to move forward? Would additional hands
>>> help?
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Sat, Apr 13, 2019 at 7:10 PM Daryl Roberts 
>>> wrote:
>>>
 > Shall we add a guide page to show people how to publish their
 projects to the website? The exact rules can be discussed and drafted in a
 separate email thread IMO

 This is a good idea. (Both the guise, and separate thread), I think
 once there is an actual packed in place we’ll be in a lot better position
 to discuss this.

 > The "Log in with Github" link doesn't seem to work yet. Will it only
 allow login for admins and publishers, or for everyone?

 Correct, all the oauth stuff requires a real server. We are currently
 just faking everything.

 I will add a mock-login page (username/password that just accepts
 anything and displays whatever username you type in) so we can see the
 add-comment field and add-packages page once they exist.






Re: Flink Stream SQL group by TUMBLE(rowtime,)

2019-04-24 Thread liu_mingzhang
您好,请问 【没有offset从earliest开始,有则从offset开始】这个是怎么实现的啊


On 4/24/2019 16:46,邵志鹏 wrote:
大家好,问题求助:


事件时间使用EventTime,默认Checkpoint【没有offset从earliest开始,有则从offset开始】。


assignTimestampsAndWatermarks,1、不使用Watermark,即默认eventtime时间戳;2、使用官方max取最大值;两种情况问题基本相同:
问题描述:
比如,发送40条数据,窗口消费33条。另外7条,需要继续发送新的数据,才会被消费掉,即便重启程序-也要发送新的数据,才会消费上次"未及时"消费的数据,而不是自动从上一次的offset+1开始。


SQL:
SELECT astyle, TUMBLE_START(rowtime, INTERVAL '10' SECOND) time_start, 
TUMBLE_END(rowtime, INTERVAL '10' SECOND) time_end, SUM(energy) AS sum_energy, 
CAST(COUNT(aid) AS INT) AS cnt, CAST(AVG(age) AS INT) AS avg_age FROM t_pojo 
GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), astyle


assignTimestampsAndWatermarks,3、使用System.currentTimeMillis()作为Watermark,消息消费是及时了,但是又有新的问题:
即程序启动后,相当于从latest开始消费消息,不能消费未消费过的offset【即使换掉group.id消费者组也是如此】。


https://stackoverflow.com/questions/55499764/how-to-let-flink-flush-last-line-to-sink-when-producerkafka-does-not-produce-n
https://stackoverflow.com/questions/55779711/why-not-on-time-when-i-consumed-kafka-message-using-flink-streaming-sql-group-by


另外,UTC时区的问题,目前是extractTimestamp的时候,+2880。很想知道大神们的解决方法是什么,或者就是保存UTC时间,然后结果数据对外的时候再做处理?





Re: QueryableState startup regression in 1.8.0 ( migration from 1.7.2 )

2019-04-24 Thread Guowei Ma
You could try to set queryable-state.enable to true. And check again.

Vishal Santoshi 于2019年4月25日 周四上午1:40写道:

> Any one ?
>
> On Wed, Apr 24, 2019 at 12:02 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Hello folks,
>>
>>  Following
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>  .
>> for setting up the Queryable Server and proxy, I have my classpath ( the
>> lib directory ) that has the  required jar, But I do not see the mentioned
>> log and of course am not able to set up the QS server/Proxy . This has
>> worked on 1.7.2 and I think I have everything as advised, see the logs
>> below. I do not  see this log  "Started the Queryable State Proxy Server
>> @ ...".  Any one with this issue...
>>
>>
>>
>> 2019-04-24 15:54:26,296 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>   - -Dtaskmanager.numberOfTaskSlots=1
>>
>> 2019-04-24 15:54:26,296 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>   - --configDir
>>
>> 2019-04-24 15:54:26,296 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>   - /usr/local/flink/conf
>>
>> 2019-04-24 15:54:26,296 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>   -  Classpath:
>> /usr/local/flink/lib/flink-metrics-prometheus-1.8.0.jar:
>> */usr/local/flink/lib/flink-queryable-state-runtime_2.11-1.8.0.jar*
>> :/usr/local/flink/lib/hadoop.jar:/usr/local/flink/lib/jobs.jar:/usr/local/flink/lib/log4j-1.2.17.jar:/usr/local/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/local/flink/lib/flink-dist_2.11-1.8.0.jar:::
>>
>> 2019-04-24 15:54:26,296 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>   -
>> 
>>
>> 2019-04-24 15:54:26,298 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>   - Registered UNIX signal handlers for [TERM, HUP, INT]
>>
>> 2019-04-24 15:54:26,300 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>   - Maximum number of open file descriptors is 65536.
>>
>> 2019-04-24 15:54:26,305 INFO
>> org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: state.backend.fs.checkpointdir,
>> hdfs:///flink-checkpoints_k8s_test/prod
>>
>> 2
>>
>>
>>
>>
>> --
Best,
Guowei


Job Startup Arguments

2019-04-24 Thread Steven Nelson
Hello!

Is there a way (via the REST API) to see the parameters used to start a job?

-Steve


Re: AskTimeoutException

2019-04-24 Thread Alex Soto
I found the issue was a hard-coded timeout value in MiniCluster class, which is 
used for stand alone execution:


public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
this.miniClusterConfiguration = 
checkNotNull(miniClusterConfiguration, "config may not be null");

this.rpcTimeout = Time.seconds(10L);
this.terminationFuture = 
CompletableFuture.completedFuture(null);
running = false;
}


This was fixed in later versions:  
https://issues.apache.org/jira/browse/FLINK-11690

So the solution is to upgrade to 1.7.3 or 1.8.0

Best regards,
Alex soto




> On Apr 24, 2019, at 1:39 PM, Alex Soto  wrote:
> 
> Thanks Abdul for the help.  So I added this:
> 
>   cfg.setString(AkkaOptions.LOOKUP_TIMEOUT, "2 min");
> 
> 
> But I am still I am getting the same error:
> 
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher62bab021-4a79-4d10-8d45-7a33c493a925#-199361569
>  
> ]]
>  after [1 ms]. Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> 
> 
> 
> Best regards,
> Alex soto
> 
> 
> 
> 
>> On Apr 12, 2019, at 6:34 PM, Abdul Qadeer > > wrote:
>> 
>> Hi Alex,
>> 
>> The timeout shown in the exception is due to  AkkaOptions.LOOKUP_TIMEOUT
>> 
>> On Fri, 12 Apr 2019 at 09:45, Alex Soto > > wrote:
>> Hello,
>> 
>> I am using Flink version 1.7.1. In a unit test, I create a local environment:
>> 
>>  Configuration cfg = new Configuration();
>>  cfg.setString(AkkaOptions.ASK_TIMEOUT, "2 min");
>>  cfg.setString(AkkaOptions.CLIENT_TIMEOUT, "2 min");
>> 
>>  LocalEnvironment env = 
>> ExecutionEnvironment.createLocalEnvironment(cfg);
>> 
>> Yet, when I run the test, I am getting the following error:
>> 
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
>> [Actor[akka://flink/user/dispatcher87b320bd-c3c8-485f-82f7-113f52fb46a1#-1843625489
>>  <>]] after [1 ms]. Sender[null] sent message of type 
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>  at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
>>  at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>  at 
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
>>  at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
>>  at 
>> scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
>>  at 
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
>>  at 
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>  at 
>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>  at 
>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>  at 
>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>  at java.lang.Thread.run(Thread.java:748)
>> 
>> 
>> 
>> The question is why doesn’t Flink honor the timeout configuration values I 
>> am passing when creating the local environment.  I am passing 2 minutes, bu 
>> the error message says it timed out after 10 seconds.
>> 
>> 
>> Best regards,
>> Alex soto
>> 
>> 
>> 
>> 
> 



Re: Watermark for each key?

2019-04-24 Thread Lasse Nedergaard
Thanks Till

What about this workaround. 
If I after the watermark assignment split the stream in elements that fits in 
the watermark (s1) and those that don’t (s2). The s1 I process with the table 
api with a window aggregate using watermark and s2 I handle with an unbounded 
non-windows aggregate with IdleStateRentionTime so state are removed when my 
devices are up to date again. I then merge the two outputs and continue. 
By doing this I handle 99% as standard and only keeping state for the late 
data. 

Make sense? And would it work?

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 24. apr. 2019 kl. 19.00 skrev Till Rohrmann :
> 
> Hi Lasse,
> 
> at the moment this is not supported out of the box by Flink. The community 
> thought about this feature but so far did not implement it. Unfortunately, 
> I'm also not aware of an easy workaround one could do in the user code space.
> 
> Cheers,
> Till
> 
>> On Wed, Apr 24, 2019 at 3:26 PM Lasse Nedergaard  
>> wrote:
>> Hi.
>> 
>> We work with IoT data and we have cases where the IoT-device delay data 
>> transfer if it can't get network access. We would like to use table windows 
>> aggregate function over each device to calculate some statistics, but for 
>> windows aggregate functions to work we need to assign a watermark. This 
>> watermark is general for all devices. We can set allow latency, but we can't 
>> set it to months. 
>> So what we need is to have a watermark for each device (key by) so the 
>> window aggregate work on the timestamp delivered for the device and not the 
>> global watermark. 
>> Is that possible, or have anyone consider this feature?
>> 
>> Best 
>> 
>> Lasse Nedergaard


Re: QueryableState startup regression in 1.8.0 ( migration from 1.7.2 )

2019-04-24 Thread Vishal Santoshi
Any one ?

On Wed, Apr 24, 2019 at 12:02 PM Vishal Santoshi 
wrote:

> Hello folks,
>
>  Following
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>  .
> for setting up the Queryable Server and proxy, I have my classpath ( the
> lib directory ) that has the  required jar, But I do not see the mentioned
> log and of course am not able to set up the QS server/Proxy . This has
> worked on 1.7.2 and I think I have everything as advised, see the logs
> below. I do not  see this log  "Started the Queryable State Proxy Server
> @ ...".  Any one with this issue...
>
>
>
> 2019-04-24 15:54:26,296 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - -Dtaskmanager.numberOfTaskSlots=1
>
> 2019-04-24 15:54:26,296 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - --configDir
>
> 2019-04-24 15:54:26,296 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - /usr/local/flink/conf
>
> 2019-04-24 15:54:26,296 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   -  Classpath:
> /usr/local/flink/lib/flink-metrics-prometheus-1.8.0.jar:
> */usr/local/flink/lib/flink-queryable-state-runtime_2.11-1.8.0.jar*
> :/usr/local/flink/lib/hadoop.jar:/usr/local/flink/lib/jobs.jar:/usr/local/flink/lib/log4j-1.2.17.jar:/usr/local/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/local/flink/lib/flink-dist_2.11-1.8.0.jar:::
>
> 2019-04-24 15:54:26,296 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   -
> 
>
> 2019-04-24 15:54:26,298 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Registered UNIX signal handlers for [TERM, HUP, INT]
>
> 2019-04-24 15:54:26,300 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>   - Maximum number of open file descriptors is 65536.
>
> 2019-04-24 15:54:26,305 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: state.backend.fs.checkpointdir,
> hdfs:///flink-checkpoints_k8s_test/prod
>
> 2
>
>
>
>
>


Re: AskTimeoutException

2019-04-24 Thread Alex Soto
Thanks Abdul for the help.  So I added this:

cfg.setString(AkkaOptions.LOOKUP_TIMEOUT, "2 min");


But I am still I am getting the same error:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher62bab021-4a79-4d10-8d45-7a33c493a925#-199361569]]
 after [1 ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".



Best regards,
Alex soto




> On Apr 12, 2019, at 6:34 PM, Abdul Qadeer  wrote:
> 
> Hi Alex,
> 
> The timeout shown in the exception is due to  AkkaOptions.LOOKUP_TIMEOUT
> 
> On Fri, 12 Apr 2019 at 09:45, Alex Soto  > wrote:
> Hello,
> 
> I am using Flink version 1.7.1. In a unit test, I create a local environment:
> 
>   Configuration cfg = new Configuration();
>   cfg.setString(AkkaOptions.ASK_TIMEOUT, "2 min");
>   cfg.setString(AkkaOptions.CLIENT_TIMEOUT, "2 min");
> 
>   LocalEnvironment env = 
> ExecutionEnvironment.createLocalEnvironment(cfg);
> 
> Yet, when I run the test, I am getting the following error:
> 
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher87b320bd-c3c8-485f-82f7-113f52fb46a1#-1843625489
>  <>]] after [1 ms]. Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>   at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
>   at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
>   at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
>   at 
> scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>   at java.lang.Thread.run(Thread.java:748)
> 
> 
> 
> The question is why doesn’t Flink honor the timeout configuration values I am 
> passing when creating the local environment.  I am passing 2 minutes, bu the 
> error message says it timed out after 10 seconds.
> 
> 
> Best regards,
> Alex soto
> 
> 
> 
> 



Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-24 Thread Rong Rong
Hi Mans,

Sameer is correct. if you would like to control window triggering based on
other elements that does not belong to this window (in a keyed stream
context) then this is probably the best way to approach.

I think you've also posted in another thread that describes what will be
left after fire-and-purge [1]. As Fabian stated: the only thing that
might've left after is the window (which is the 2 long values indicate the
start/end) and the trigger object. But you are right it might eventually
filled up memory.

Another approach is to implement your own operator that handles all these
internally by your user code. This would require you to replicate many of
the window operator logic though.

Thanks,
Rong

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-How-to-destroy-global-window-and-release-it-s-resources-td27191.html#a27212

On Wed, Apr 24, 2019 at 5:02 AM Sameer W  wrote:

> Global Windows is fine for this use case. I have used the same strategy.
> You just define custom evictors and triggers and you are all good. Windows
> are managed by keys, so as such as long as events are evicted from the
> window, that counts towards reclaiming memory for the key+window
> combination. Plus there is just window per key with Global Windows.
>
> On Wed, Apr 24, 2019 at 7:47 AM M Singh  wrote:
>
>> Hi Rong:
>>
>> Thanks for your answer.
>>
>> From what I understand the dynamic gap session windows are also created
>> when the event is encountered.  I need to be able to change the window end
>> time at a later time based on what other events are in that window.  One
>> way to do this is to use GlobalWindows but then these are never deleted.
>>
>> Regarding CEP option - I believe that CEP patterns cannot be changed
>> dynamically once they've been complied which limits it usage.
>>
>> Please feel free to correct me.
>>
>> Thanks for your help and pointers.
>>
>> On Tuesday, April 23, 2019, 8:12:56 PM EDT, Rong Rong <
>> walter...@gmail.com> wrote:
>>
>>
>> Hi Mans,
>>
>> I am not sure what you meant by "dynamically change the end-time of a
>> window. If you are referring to dynamically determines the firing time of
>> the window, then it fits into the description of session window [1]:
>> If you want to handle window end time dynamically, one way of which I can
>> think of is the dynamic gap, session window [1] approach. with which you
>> can specify the end-time of a window based on input elements. Provided that
>> you are maintaining a session window.
>> Another way to look at it is through the Flink-CEP library [2].
>>
>> Thanks,
>> Rong
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/cep.html#groups-of-patterns
>>
>> On Tue, Apr 23, 2019 at 8:19 AM M Singh  wrote:
>>
>> Hi:
>>
>> I am working on a project and need to change the end time of the window
>> dynamically.  I want to find out if the end time of the window is used
>> internally (for sorting windows/etc) except for handling watermarks that
>> would cause problems if the end time was changed during run time after the
>> window has been created even if no new event has arrived for that window.
>>
>> I don't want to use GlobalWindow since from my understanding it never
>> gets destroyed.
>>
>> If there is any alternate way of dealing with this, please let me know.
>>
>> Thanks
>>
>> Mans
>>
>>


Re: Fast restart of a job with a large state

2019-04-24 Thread Sergey Zhemzhitsky
Hi Till,

Thanks for the info!
It's good to know.

Regards,
Sergey


On Wed, Apr 24, 2019, 13:08 Till Rohrmann  wrote:

> Hi Sergey,
>
> at the moment neither local nor incremental savepoints are supported in
> Flink afaik. There were some ideas wrt incremental savepoints floating
> around in the community but nothing concrete yet.
>
> Cheers,
> Till
>
> On Tue, Apr 23, 2019 at 6:58 PM Sergey Zhemzhitsky 
> wrote:
>
>> Hi Stefan, Paul,
>>
>> Thanks for the tips! Currently I have not tried neither rescaling from
>> checkpoints nor task local recovery. Now it's a subject to test.
>>
>> In case it will be necessary not to just rescale a job, but also to
>> change its DAG - is there a way to have something like let's call it "local
>> savepoints" or "incremental savepoints" to prevent the whole state
>> transferring to and from a distributed storage?
>>
>> Kind Regards,
>> Sergey
>>
>>
>> On Thu, Apr 18, 2019, 13:22 Stefan Richter 
>> wrote:
>>
>>> Hi,
>>>
>>> If rescaling is the problem, let me clarify that you can currently
>>> rescale from savepoints and all types of checkpoints (including
>>> incremental). If that was the only problem, then there is nothing to worry
>>> about - the documentation is only a bit conservative about this because we
>>> will not commit to an APU that all future types checkpoints will be
>>> resealable. But currently they are all, and this is also very unlikely to
>>> change anytime soon.
>>>
>>> Paul, just to comment on your suggestion as well, local recovery would
>>> only help with failover. 1) It does not help for restarts by the user and
>>> 2) also does not work for rescaling (2) is a consequence of 1) because
>>> failover never rescales, only restarts).
>>>
>>> Best,
>>> Stefan
>>>
>>> On 18. Apr 2019, at 12:07, Paul Lam  wrote:
>>>
>>> The URL in my previous mail is wrong, and it should be:
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery
>>>
>>> Best,
>>> Paul Lam
>>>
>>> 在 2019年4月18日,18:04,Paul Lam  写道:
>>>
>>> Hi,
>>>
>>> Have you tried task local recovery [1]?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>>
>>> Best,
>>> Paul Lam
>>>
>>> 在 2019年4月17日,17:46,Sergey Zhemzhitsky  写道:
>>>
>>> Hi Flinkers,
>>>
>>> Operating different flink jobs I've discovered that job restarts with
>>> a pretty large state (in my case this is up to 100GB+) take quite a
>>> lot of time. For example, to restart a job (e.g. to update it) the
>>> savepoint is created, and in case of savepoints all the state seems to
>>> be pushed into the distributed store (hdfs in my case) when stopping a
>>> job and pulling this state back when starting the new version of the
>>> job.
>>>
>>> What I've found by the moment trying to speed up job restarts is:
>>> - using external retained checkpoints [1]; the drawback is that the
>>> job cannot be rescaled during restart
>>> - using external state and storage with the stateless jobs; the
>>> drawback is the necessity of additional network hops to this storage.
>>>
>>> So I'm wondering whether there are any best practices community knows
>>> and uses to cope with the cases like this?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>>
>>>
>>>
>>>
>>>


Re: Watermark for each key?

2019-04-24 Thread Till Rohrmann
Hi Lasse,

at the moment this is not supported out of the box by Flink. The community
thought about this feature but so far did not implement it. Unfortunately,
I'm also not aware of an easy workaround one could do in the user code
space.

Cheers,
Till

On Wed, Apr 24, 2019 at 3:26 PM Lasse Nedergaard 
wrote:

> Hi.
>
> We work with IoT data and we have cases where the IoT-device delay data
> transfer if it can't get network access. We would like to use table windows
> aggregate function over each device to calculate some statistics, but for
> windows aggregate functions to work we need to assign a watermark. This
> watermark is general for all devices. We can set allow latency, but we
> can't set it to months.
> So what we need is to have a watermark for each device (key by) so the
> window aggregate work on the timestamp delivered for the device and not the
> global watermark.
> Is that possible, or have anyone consider this feature?
>
> Best
>
> Lasse Nedergaard
>
>


QueryableState startup regression in 1.8.0 ( migration from 1.7.2 )

2019-04-24 Thread Vishal Santoshi
Hello folks,

 Following
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
.
for setting up the Queryable Server and proxy, I have my classpath ( the
lib directory ) that has the  required jar, But I do not see the mentioned
log and of course am not able to set up the QS server/Proxy . This has
worked on 1.7.2 and I think I have everything as advised, see the logs
below. I do not  see this log  "Started the Queryable State Proxy Server @
...".  Any one with this issue...



2019-04-24 15:54:26,296 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - -Dtaskmanager.numberOfTaskSlots=1

2019-04-24 15:54:26,296 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - --configDir

2019-04-24 15:54:26,296 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - /usr/local/flink/conf

2019-04-24 15:54:26,296 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  -  Classpath: /usr/local/flink/lib/flink-metrics-prometheus-1.8.0.jar:
*/usr/local/flink/lib/flink-queryable-state-runtime_2.11-1.8.0.jar*
:/usr/local/flink/lib/hadoop.jar:/usr/local/flink/lib/jobs.jar:/usr/local/flink/lib/log4j-1.2.17.jar:/usr/local/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/local/flink/lib/flink-dist_2.11-1.8.0.jar:::

2019-04-24 15:54:26,296 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  -


2019-04-24 15:54:26,298 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Registered UNIX signal handlers for [TERM, HUP, INT]

2019-04-24 15:54:26,300 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  - Maximum number of open file descriptors is 65536.

2019-04-24 15:54:26,305 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: state.backend.fs.checkpointdir,
hdfs:///flink-checkpoints_k8s_test/prod

2


Re: [DISCUSS] Create a Flink ecosystem website

2019-04-24 Thread Robert Metzger
Hey all,

quick update on this project: The frontend and backend code have been put
together into this repository: https://github.com/sorahn/flink-ecosystem
We also just agreed on an API specification, and will now work on finishing
the backend.

It will probably take a few more weeks for this to finish, but we are
making progress :)

Best,
Robert


On Mon, Apr 15, 2019 at 11:18 AM Robert Metzger  wrote:

> Hey Daryl,
>
> thanks a lot for posting a link to this first prototype on the mailing
> list! I really like it!
>
> Becket: Our plan forward is that Congxian is implementing the backend for
> the website. He has already started with the work, but needs at least one
> more week.
>
>
> [Re-sending this email because the first one was blocked on dev@f.a.o]
>
>
> On Mon, Apr 15, 2019 at 7:59 AM Becket Qin  wrote:
>
>> Hi Daryl,
>>
>> Thanks a lot for the update. The site looks awesome! This is a great
>> progress. I really like the conciseness of GUI.
>>
>> One minor suggestion is that for the same library, there might be
>> multiple versions compatible with different Flink versions. It would be
>> good to show that somewhere in the project page as it seems important to
>> the users.
>>
>> BTW, will you share the plan to move forward? Would additional hands help?
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Sat, Apr 13, 2019 at 7:10 PM Daryl Roberts 
>> wrote:
>>
>>> > Shall we add a guide page to show people how to publish their projects
>>> to the website? The exact rules can be discussed and drafted in a separate
>>> email thread IMO
>>>
>>> This is a good idea. (Both the guise, and separate thread), I think once
>>> there is an actual packed in place we’ll be in a lot better position to
>>> discuss this.
>>>
>>> > The "Log in with Github" link doesn't seem to work yet. Will it only
>>> allow login for admins and publishers, or for everyone?
>>>
>>> Correct, all the oauth stuff requires a real server. We are currently
>>> just faking everything.
>>>
>>> I will add a mock-login page (username/password that just accepts
>>> anything and displays whatever username you type in) so we can see the
>>> add-comment field and add-packages page once they exist.
>>>
>>>
>>>
>>>


Re: Flink CLI

2019-04-24 Thread Oytun Tez
Hi Steven,

As much as I am aware,
1) no update call. our build flow feels a little weird to us as well.
definitely requires scripting.
2) we are using Flink management API remotely in our build flow to 1) get
jobs, 2) savepoint them, 3) cancel them etc. I am going to release a Python
script for this soon.

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Wed, Apr 24, 2019 at 11:06 AM Steven Nelson 
wrote:

> Hello!
>
> I am working on automating our deployments to our Flink cluster. I had a
> couple questions about the flink cli.
>
> 1) I thought there was an "update" command that would internally manage
> the cancel with savepoint, upload new jar, restart from savepoint process.
>
> 2) Is there a way to get the Flink cli to output it's result in a json
> format? Right now I would need to parse the results of the "flink list"
> command to get the job id, cancel the job with savepoint, parse the results
> of that to get the savepoint filename, then restore using that. Parsing the
> output seems brittle to me.
>
> Thought?
> -Steve
>
>


Re: Flink CLI

2019-04-24 Thread Zack Bartel
Hi Steve,
I recently solved this problem using the REST api and some python scripts. The 
script has a function "upgrade_job" which will cancel with savepoint, 
optionally upload a new jar from the local filestystem or S3, and start the job 
from the savepoint including any changes in parallelism. We've used Jenkins to 
upload new jars to S3 and automated the deployment using saltstack and the 
attached python script.

Please let me know if you find a better way to do this!

Zack 

https://github.com/zackb/code/blob/master/python/dink/dink.py 





Re: No zero ( 2 ) exit code on k8s StandaloneJobClusterEntryPoint when save point with cancel...

2019-04-24 Thread Till Rohrmann
Good to hear. Could you create a documentation JIRA issue for this problem?
Thanks a lot.

Cheers,
Till

On Wed, Apr 24, 2019 at 4:58 PM Vishal Santoshi 
wrote:

> Verified, I think we just need to make sure that it is documented :)
>
> On Wed, Apr 24, 2019 at 9:47 AM Vishal Santoshi 
> wrote:
>
>> This makes total sense and actually is smart ( defensive ). Will test and
>> report. I think though that this needs to be documented :)
>>
>> On Wed, Apr 24, 2019 at 6:03 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> it seems that the following is happening: You triggered the cancel with
>>> savepoint command from via the REST call. This command is an asynchronous
>>> operation which produces a result (the savepoint path). In order to deliver
>>> asynchronous results to the caller, Flink waits before shutting down until
>>> they are delivered or until it times out after 5 minutes. I assume that you
>>> don't request the savepoint path from Flink via the returned URL from the
>>> original request. This could either happen if you kill the CLI before its
>>> done or if you have written your own method to trigger this operation.
>>>
>>> I guess we could add a flag for asynchronous operations which tells
>>> Flink that their results don't need to get delivered to some client. If you
>>> would like to have such a feature, then please open a JIRA issue for it.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 24, 2019 at 3:49 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Anyione ?



 I think there some race condition .  These are the TM logs.. I am
 puzzled  b'coz in a larger pipe ( there are about 32 lots on 8 replicas and
 it works




 2019-04-24 01:16:20,889 DEBUG
 org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
 Releasing local state under allocation id 5a853ef886e1c599f86b9503306fffd2.

 2019-04-24 01:16:20,894 DEBUG
 org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
 JobManager connection for job .

 org.apache.flink.util.FlinkException: Stopping JobMaster for job
 EventCountJob().

 at
 org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:355)

 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504)

 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)

 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)

 at
 org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)

 at
 akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)

 at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

 at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)

 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

 at akka.actor.ActorCell.invoke(ActorCell.scala:495)

 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

 at akka.dispatch.Mailbox.run(Mailbox.scala:224)

 at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 2019-04-24 01:16:20,895 INFO
 org.apache.flink.runtime.taskexecutor.JobLeaderService- Cannot
 reconnect to job  because it is not
 registered.

 2019-04-24 01:16:21,053 DEBUG
 org.apache.flink.runtime.taskexecutor.TaskExecutor-
 Received heartbeat request from e61c2b7d992f151936e21db1ca0d.

 2019-04-24 01:16:22,136 DEBUG
 org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
 Got ping response for sessionid: 0x25add5478fb2ec6 after 0ms

 2019-04-24 01:16:31,052 DEBUG
 org.apache.flink.runtime.taskexecutor.TaskExecutor-
 Received heartbeat request from e61c2b7d992f151936e21db1ca0d.

 2019-04-24 01:16:35,483 DEBUG
 org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
 Got ping response for sessionid: 0x25add5478fb2ec6 after 0ms

 On Tue, Apr 23, 2019 at 3:11 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> I see this in the TM pod
>
> 2019-04-23 19:08:41,828 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
> Got ping response for sessionid: 0x15cc7f3d88466a5 after 0ms
>
> 2019-04-23 19:08:47,543 DEBUG
> 

Flink CLI

2019-04-24 Thread Steven Nelson
Hello!

I am working on automating our deployments to our Flink cluster. I had a
couple questions about the flink cli.

1) I thought there was an "update" command that would internally manage the
cancel with savepoint, upload new jar, restart from savepoint process.

2) Is there a way to get the Flink cli to output it's result in a json
format? Right now I would need to parse the results of the "flink list"
command to get the job id, cancel the job with savepoint, parse the results
of that to get the savepoint filename, then restore using that. Parsing the
output seems brittle to me.

Thought?
-Steve


Re: No zero ( 2 ) exit code on k8s StandaloneJobClusterEntryPoint when save point with cancel...

2019-04-24 Thread Vishal Santoshi
Verified, I think we just need to make sure that it is documented :)

On Wed, Apr 24, 2019 at 9:47 AM Vishal Santoshi 
wrote:

> This makes total sense and actually is smart ( defensive ). Will test and
> report. I think though that this needs to be documented :)
>
> On Wed, Apr 24, 2019 at 6:03 AM Till Rohrmann 
> wrote:
>
>> Hi Vishal,
>>
>> it seems that the following is happening: You triggered the cancel with
>> savepoint command from via the REST call. This command is an asynchronous
>> operation which produces a result (the savepoint path). In order to deliver
>> asynchronous results to the caller, Flink waits before shutting down until
>> they are delivered or until it times out after 5 minutes. I assume that you
>> don't request the savepoint path from Flink via the returned URL from the
>> original request. This could either happen if you kill the CLI before its
>> done or if you have written your own method to trigger this operation.
>>
>> I guess we could add a flag for asynchronous operations which tells Flink
>> that their results don't need to get delivered to some client. If you would
>> like to have such a feature, then please open a JIRA issue for it.
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 24, 2019 at 3:49 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Anyione ?
>>>
>>>
>>>
>>> I think there some race condition .  These are the TM logs.. I am
>>> puzzled  b'coz in a larger pipe ( there are about 32 lots on 8 replicas and
>>> it works
>>>
>>>
>>>
>>>
>>> 2019-04-24 01:16:20,889 DEBUG
>>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>> Releasing local state under allocation id 5a853ef886e1c599f86b9503306fffd2.
>>>
>>> 2019-04-24 01:16:20,894 DEBUG
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
>>> JobManager connection for job .
>>>
>>> org.apache.flink.util.FlinkException: Stopping JobMaster for job
>>> EventCountJob().
>>>
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:355)
>>>
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504)
>>>
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)
>>>
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>
>>> at
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> 2019-04-24 01:16:20,895 INFO
>>> org.apache.flink.runtime.taskexecutor.JobLeaderService- Cannot
>>> reconnect to job  because it is not
>>> registered.
>>>
>>> 2019-04-24 01:16:21,053 DEBUG
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor-
>>> Received heartbeat request from e61c2b7d992f151936e21db1ca0d.
>>>
>>> 2019-04-24 01:16:22,136 DEBUG
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>>> Got ping response for sessionid: 0x25add5478fb2ec6 after 0ms
>>>
>>> 2019-04-24 01:16:31,052 DEBUG
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor-
>>> Received heartbeat request from e61c2b7d992f151936e21db1ca0d.
>>>
>>> 2019-04-24 01:16:35,483 DEBUG
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>>> Got ping response for sessionid: 0x25add5478fb2ec6 after 0ms
>>>
>>> On Tue, Apr 23, 2019 at 3:11 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 I see this in the TM pod

 2019-04-23 19:08:41,828 DEBUG
 org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
 Got ping response for sessionid: 0x15cc7f3d88466a5 after 0ms

 2019-04-23 19:08:47,543 DEBUG
 org.apache.flink.runtime.taskexecutor.TaskExecutor-
 Received heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.

 2019-04-23 19:08:55,175 DEBUG
 org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
 Got ping response for sessionid: 0x15cc7f3d88466a5 after 1ms

 

Re: No zero ( 2 ) exit code on k8s StandaloneJobClusterEntryPoint when save point with cancel...

2019-04-24 Thread Vishal Santoshi
This makes total sense and actually is smart ( defensive ). Will test and
report. I think though that this needs to be documented :)

On Wed, Apr 24, 2019 at 6:03 AM Till Rohrmann  wrote:

> Hi Vishal,
>
> it seems that the following is happening: You triggered the cancel with
> savepoint command from via the REST call. This command is an asynchronous
> operation which produces a result (the savepoint path). In order to deliver
> asynchronous results to the caller, Flink waits before shutting down until
> they are delivered or until it times out after 5 minutes. I assume that you
> don't request the savepoint path from Flink via the returned URL from the
> original request. This could either happen if you kill the CLI before its
> done or if you have written your own method to trigger this operation.
>
> I guess we could add a flag for asynchronous operations which tells Flink
> that their results don't need to get delivered to some client. If you would
> like to have such a feature, then please open a JIRA issue for it.
>
> Cheers,
> Till
>
> On Wed, Apr 24, 2019 at 3:49 AM Vishal Santoshi 
> wrote:
>
>> Anyione ?
>>
>>
>>
>> I think there some race condition .  These are the TM logs.. I am
>> puzzled  b'coz in a larger pipe ( there are about 32 lots on 8 replicas and
>> it works
>>
>>
>>
>>
>> 2019-04-24 01:16:20,889 DEBUG
>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>> Releasing local state under allocation id 5a853ef886e1c599f86b9503306fffd2.
>>
>> 2019-04-24 01:16:20,894 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
>> JobManager connection for job .
>>
>> org.apache.flink.util.FlinkException: Stopping JobMaster for job
>> EventCountJob().
>>
>> at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:355)
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504)
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> 2019-04-24 01:16:20,895 INFO
>> org.apache.flink.runtime.taskexecutor.JobLeaderService- Cannot
>> reconnect to job  because it is not
>> registered.
>>
>> 2019-04-24 01:16:21,053 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
>> heartbeat request from e61c2b7d992f151936e21db1ca0d.
>>
>> 2019-04-24 01:16:22,136 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> ping response for sessionid: 0x25add5478fb2ec6 after 0ms
>>
>> 2019-04-24 01:16:31,052 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
>> heartbeat request from e61c2b7d992f151936e21db1ca0d.
>>
>> 2019-04-24 01:16:35,483 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> ping response for sessionid: 0x25add5478fb2ec6 after 0ms
>>
>> On Tue, Apr 23, 2019 at 3:11 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I see this in the TM pod
>>>
>>> 2019-04-23 19:08:41,828 DEBUG
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>>> Got ping response for sessionid: 0x15cc7f3d88466a5 after 0ms
>>>
>>> 2019-04-23 19:08:47,543 DEBUG
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor-
>>> Received heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>>>
>>> 2019-04-23 19:08:55,175 DEBUG
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>>> Got ping response for sessionid: 0x15cc7f3d88466a5 after 1ms
>>>
>>> 2019-04-23 19:08:57,548 DEBUG
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor-
>>> Received heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>>>
>>> 2019-04-23 19:09:07,543 DEBUG
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor-
>>> Received heartbeat 

Re: [DISCUSS] Temporarily remove support for job rescaling via CLI action "modify"

2019-04-24 Thread Gary Yao
The idea is to also remove the rescaling code in the JobMaster. This will
make
it easier to remove the ExecutionGraph reference from the JobMaster which is
needed for the scheduling rework [1].

[1] https://issues.apache.org/jira/browse/FLINK-12231

On Wed, Apr 24, 2019 at 12:14 PM Shuai Xu  wrote:

> Will we only remove command support in client side or the code in job
> master will also be removed?
>
> Till Rohrmann  于2019年4月24日周三 下午4:12写道:
>
> > +1 for temporarily removing support for the modify command.
> >
> > Eventually, we have to add it again in order to support auto scaling. The
> > next time we add it, we should address the known limitations.
> >
> > Cheers,
> > Till
> >
> > On Wed, Apr 24, 2019 at 9:06 AM Paul Lam  wrote:
> >
> > > Hi Gary,
> > >
> > > + 1 to remove it for now. Actually some users are not aware of that
> it’s
> > > still experimental, and ask quite a lot about the problem it causes.
> > >
> > > Best,
> > > Paul Lam
> > >
> > > 在 2019年4月24日,14:49,Stephan Ewen  写道:
> > >
> > > Sounds reasonable to me. If it is a broken feature, then there is not
> > much
> > > value in it.
> > >
> > > On Tue, Apr 23, 2019 at 7:50 PM Gary Yao  wrote:
> > >
> > > Hi all,
> > >
> > > As the subject states, I am proposing to temporarily remove support for
> > > changing the parallelism of a job via the following syntax [1]:
> > >
> > >./bin/flink modify [job-id] -p [new-parallelism]
> > >
> > > This is an experimental feature that we introduced with the first
> rollout
> > > of
> > > FLIP-6 (Flink 1.5). However, this feature comes with a few caveats:
> > >
> > >* Rescaling does not work with HA enabled [2]
> > >* New parallelism is not persisted, i.e., after a JobManager
> restart,
> > > the job
> > >  will be recovered with the initial parallelism
> > >
> > > Due to the above-mentioned issues, I believe that currently nobody uses
> > > "modify -p" to rescale their jobs in production. Moreover, the
> rescaling
> > > feature stands in the way of our current efforts to rework Flink's
> > > scheduling
> > > [3]. I therefore propose to remove the rescaling code for the time
> being.
> > > Note
> > > that it will still be possible to change the parallelism by taking a
> > > savepoint
> > > and restoring the job with a different parallelism [4].
> > >
> > > Any comments and suggestions will be highly appreciated.
> > >
> > > Best,
> > > Gary
> > >
> > > [1]
> > >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
> > > [2] https://issues.apache.org/jira/browse/FLINK-8902
> > > [3] https://issues.apache.org/jira/browse/FLINK-10429
> > > [4]
> > >
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html#what-happens-when-i-change-the-parallelism-of-my-program-when-restoring
> > >
> > >
> > >
> >
>


Watermark for each key?

2019-04-24 Thread Lasse Nedergaard
Hi.

We work with IoT data and we have cases where the IoT-device delay data
transfer if it can't get network access. We would like to use table windows
aggregate function over each device to calculate some statistics, but for
windows aggregate functions to work we need to assign a watermark. This
watermark is general for all devices. We can set allow latency, but we
can't set it to months.
So what we need is to have a watermark for each device (key by) so the
window aggregate work on the timestamp delivered for the device and not the
global watermark.
Is that possible, or have anyone consider this feature?

Best

Lasse Nedergaard


RE: Flink 状态使用问题咨询

2019-04-24 Thread Shi Quan
主要是考虑是在从异常恢复场景下,业务state是否需要重新加载。如果不需要重新加载,就不要记录这么多时间用来判断了。



Sent from Mail for Windows 10




From: zhang yue 
Sent: Wednesday, April 24, 2019 8:29:07 PM
To: user-zh@flink.apache.org
Subject: Re: Flink 状态使用问题咨询

嗯,明白你的意思,initTime < openTime是做何考虑,什么情况下initTime < openTime会满足

> 在 2019年4月24日,下午8:16,Shi Quan  写道:
>
> initTime < openTime



Re: [EXTERNAL] Re: Looking for help in configuring Swift as State Backend

2019-04-24 Thread Till Rohrmann
I think you also need to specify a path for the checkpoint directory. Try
to set

state.checkpoints.dir: swift://spout-checkpoints.magellan/flink/checkpoints

Cheers,
Till

On Wed, Apr 24, 2019 at 2:58 PM PoolakkalMukkath, Shakir <
shakir_poolakkalmukk...@comcast.com> wrote:

> Hi Till, Thanks for the response. Yes, I looks at  the document. But still
> trying to figure out
>
>
>
> Let me summaries my config and what I did
>
>
>
>1. Copied flink-swift-fs-hadoop-1.6.2.jar to lib
>2. *flink-conf.yaml*
>
>
>
>
> #==
>
> # Fault tolerance and checkpointing
>
>
> #==
>
>
>
> # The backend that will be used to store operator state checkpoints if
>
> # checkpointing is enabled.
>
> #
>
> # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
>
> # .
>
> #
>
> state.backend: filesystem
>
>
>
> # Directory for checkpoints filesystem, when using any of the default
> bundled
>
> # state backends.
>
> state.checkpoints.dir: swift://spout-checkpoints.magellan
>
>
>
>
> #==
>
> # Hadoop
>
>
> #==
>
> fs.hdfs.hadoopconf: /app/stream/flink-standalone/hadoop/
>
> OR
>
>
>
> export HADOOP_CONF_DIR=/app/stream/flink-standalone/hadoop/
>
>
>
>
>
>
>
>1. And have the  core-site.xml in
>HADOOP_CONF_DIR=/app/stream/flink-standalone/hadoop/
>
>
>
> 
>
> 
>
>
>
> 
>
>
>
>   
>
> fs.swift.impl
>
> org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem
> 
>
>   
>
>
>
>   
>
> fs.swift.service.magellan.auth.url
>
> https://osvip-as-c01.ece.***.net:5000/v3
>
>   
>
>
>
>   
>
> fs.swift.service.magellan.username
>
> ***
>
>   
>
>
>
>   
>
> fs.swift.service.magellan.password
>
> *** 
>
>   
>
>
>
>   
>
> fs.swift.service.magellan.public
>
> true
>
>   
>
>
>
> 
>
>
>
>
>
> When I submit a job with Checkpointing enabled, getting the below error,
>
>
>
> java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
>
>at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>
>at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>
>at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>
>at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>
>at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
>at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
>
>at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
>
>at
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
>
>at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
>
>at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>
>... 7 more
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not instantiate configured state backend
>
>at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308)
>
>at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>
>at
> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
>
>at
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
>
>at
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
>
>at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
>
>... 10 more
>
> Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> Invalid configuration for the state backend
>
>at
> org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:48)
>
>at
> org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:121)
>
>at
> org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222)
>
>at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
>
>   

Re: [EXTERNAL] Re: Looking for help in configuring Swift as State Backend

2019-04-24 Thread PoolakkalMukkath, Shakir
Hi Till, Thanks for the response. Yes, I looks at  the document. But still 
trying to figure out

Let me summaries my config and what I did


  1.  Copied flink-swift-fs-hadoop-1.6.2.jar to lib
  2.  flink-conf.yaml


#==

# Fault tolerance and checkpointing

#==



# The backend that will be used to store operator state checkpoints if

# checkpointing is enabled.

#

# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the

# .

#

state.backend: filesystem



# Directory for checkpoints filesystem, when using any of the default bundled

# state backends.

state.checkpoints.dir: swift://spout-checkpoints.magellan


#==
# Hadoop
#==
fs.hdfs.hadoopconf: /app/stream/flink-standalone/hadoop/

OR

export HADOOP_CONF_DIR=/app/stream/flink-standalone/hadoop/




  1.  And have the  core-site.xml in 
HADOOP_CONF_DIR=/app/stream/flink-standalone/hadoop/






  
fs.swift.impl
org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem
  

  
fs.swift.service.magellan.auth.url
https://osvip-as-c01.ece.***.net:5000/v3
  

  
fs.swift.service.magellan.username
***
  

  
fs.swift.service.magellan.password
*** 
  

  
fs.swift.service.magellan.public
true
  





When I submit a job with Checkpointing enabled, getting the below error,

java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Could not set up 
JobManager
   at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
   at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
   at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set 
up JobManager
   at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
   at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
   at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
   ... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
instantiate configured state backend
   at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308)
   at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
   at 
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
   at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
   at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
   at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
   ... 10 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: 
Invalid configuration for the state backend
   at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:48)
   at 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:121)
   at 
org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222)
   at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
   ... 15 more
Caused by: java.lang.IllegalArgumentException: Cannot use the root directory 
for checkpoints.
   at 
org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.validatePath(AbstractFileStateBackend.java:195)
   at 
org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.(AbstractFileStateBackend.java:109)
   at 
org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.(AbstractFileStateBackend.java:95)
   at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:319)
   at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:200)
   at 

Re: May be useful: our reference document for "Understanding State in Flink"

2019-04-24 Thread Oytun Tez
Thank you all!

@David and @Fabian can guide me (or Deepak as well) to maintain this
document if they'd like. I can export HTML from this that we can easily
play with and put in docs.

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Wed, Apr 24, 2019 at 7:33 AM Deepak Sharma  wrote:

> I want to volunteer for maintaining or adding to this kind of document.
> Please do let me know if i can.
>
> Thanks
> Deepak
>
> On Wed, Apr 24, 2019 at 6:33 AM Deepak Sharma 
> wrote:
>
>>
>>
>> On Wed, Apr 24, 2019 at 5:14 AM Till Rohrmann 
>> wrote:
>>
>>> Thanks for sharing this resource with the community Oytun. It looks
>>> really helpful.
>>>
>>> I'm pulling in David and Fabian who work a lot on documentation. Maybe
>>> it's interesting for them to take a look at. The community had once the
>>> idea to set up a cook book with common Flink recipes but we never managed
>>> to get it properly started.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Apr 23, 2019 at 5:54 PM Oytun Tez  wrote:
>>>
 We keep a document with state-related use cases in our application,
 useful for onboarding new engineers in the application. See attached PDF.

 May be useful for others. And of course, corrections are welcome.
 (Couldn't share our Wiki page)


 ---
 Oytun Tez

 *M O T A W O R D*
 The World's Fastest Human Translation Platform.
 oy...@motaword.com — www.motaword.com

>>>
>>
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>> www.keosha.net
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: Flink 状态使用问题咨询

2019-04-24 Thread zhang yue
嗯,明白你的意思,initTime < openTime是做何考虑,什么情况下initTime < openTime会满足

> 在 2019年4月24日,下午8:16,Shi Quan  写道:
> 
> initTime < openTime



RE: Flink 状态使用问题咨询

2019-04-24 Thread Shi Quan
有做过类似的事情,不用侵入flink的源码。记录几个关键信息:

  1.  Function open的时间,openTime;
  2.  数据初始化的时间,initTime,可以用State保存;
  3.  真正的业务State



当有数据过来时,iff (null == initTime) || (initTime < openTime) 进行初始数据加载动作。





Sent from Mail for Windows 10




From: zhang yue 
Sent: Wednesday, April 24, 2019 6:21:57 PM
To: user-zh@flink.apache.org
Subject: Re: Flink 状态使用问题咨询

这种情况我需要改flink源码吗,还是自己实现一个自定义的state类就好了,还有在这个state类中怎么能获取到key呢

> 在 2019年4月17日,上午11:24,wenlong.lwl  写道:
>
> 可以封装一下state 的访问,从state get不到数据的时候,去数据库里取下,更新到state里
>
> On Tue, 16 Apr 2019 at 20:53, zhang yue  wrote:
>
>> 是的,我希望从mysql加载初始的状态,因为我的kafka消息是从某个时间点开始的,在这个时间点之前的数据需要先加载到flink state
>> 那现在对于这种场景有什么替代方案吗
>>
>>> 在 2019年4月16日,下午8:33,Congxian Qiu  写道:
>>>
>>> Hi
>>> 如果你希望程序在刚开始运行的时候从外部存储加载数据,这个暂时做不到,不过现在社区正在做 Savepoint Reader/Writer
>> 相关的事情,到时候就可以了
>>>
>>> Best, Congxian
>>> On Apr 16, 2019, 20:27 +0800, zhang yue , wrote:
 你好,我有一个keyed
>> state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。


 class LineItemStat extends RichFlatMapFunction {

 /**
 * The ValueState handle. The first field is the count, the second field
>> a running sum.
 */
 private transient MapState stat_value;

 @Override
 public void flatMap(ObjectNode input, Collector out) throws
>> Exception {

 // access the state value

 }

 @Override
 public void open(Configuration config) {
 MapStateDescriptor descriptor =
 new MapStateDescriptor(
 "stat_value",String.class, Long.class); // default value of the state,
>> if nothing was set
 stat_value = getRuntimeContext().getMapState(descriptor);
 }
 }

>>
>>



Re: Missing state in RocksDB checkpoints

2019-04-24 Thread Ning Shi
Till,

Thank you for escalating this to blocker. I agree that data loss is always a 
serious issue.

For reference, the workaround is to unchain the stateful operators. To make the 
new job be able to recover from previous checkpoint, we also had to change the 
UID of the operator that was missing state and recover with allow non-restored 
argument. Otherwise, it would fail with RocksDB errors on restore.

—
Ning

> On Apr 24, 2019, at 5:02 AM, Till Rohrmann  wrote:
> 
> Thanks for reporting this issue Ning. I think this is actually a blocker for 
> the next release and should be fixed right away. For future reference here is 
> the issue [1].
> 
> I've also pulled in Stefan who knows these components very well.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-12296
> 
> Cheers,
> Till
> 
>> On Tue, Apr 23, 2019 at 5:24 PM Ning Shi  wrote:
>> On Tue, 23 Apr 2019 10:53:52 -0400,
>> Congxian Qiu wrote:
>> > Sorry for the misleading, in the previous email, I just want to say the 
>> > problem is not caused by the UUID generation, it is caused by the 
>> > different operators share the same directory(because currentlyFlink uses 
>> > JobVertx as the directory)
>> 
>> Ah, thank you for the clarification, Congxian. That makes sense.
>> 
>> Ning


Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-24 Thread Sameer W
Global Windows is fine for this use case. I have used the same strategy.
You just define custom evictors and triggers and you are all good. Windows
are managed by keys, so as such as long as events are evicted from the
window, that counts towards reclaiming memory for the key+window
combination. Plus there is just window per key with Global Windows.

On Wed, Apr 24, 2019 at 7:47 AM M Singh  wrote:

> Hi Rong:
>
> Thanks for your answer.
>
> From what I understand the dynamic gap session windows are also created
> when the event is encountered.  I need to be able to change the window end
> time at a later time based on what other events are in that window.  One
> way to do this is to use GlobalWindows but then these are never deleted.
>
> Regarding CEP option - I believe that CEP patterns cannot be changed
> dynamically once they've been complied which limits it usage.
>
> Please feel free to correct me.
>
> Thanks for your help and pointers.
>
> On Tuesday, April 23, 2019, 8:12:56 PM EDT, Rong Rong 
> wrote:
>
>
> Hi Mans,
>
> I am not sure what you meant by "dynamically change the end-time of a
> window. If you are referring to dynamically determines the firing time of
> the window, then it fits into the description of session window [1]:
> If you want to handle window end time dynamically, one way of which I can
> think of is the dynamic gap, session window [1] approach. with which you
> can specify the end-time of a window based on input elements. Provided that
> you are maintaining a session window.
> Another way to look at it is through the Flink-CEP library [2].
>
> Thanks,
> Rong
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/cep.html#groups-of-patterns
>
> On Tue, Apr 23, 2019 at 8:19 AM M Singh  wrote:
>
> Hi:
>
> I am working on a project and need to change the end time of the window
> dynamically.  I want to find out if the end time of the window is used
> internally (for sorting windows/etc) except for handling watermarks that
> would cause problems if the end time was changed during run time after the
> window has been created even if no new event has arrived for that window.
>
> I don't want to use GlobalWindow since from my understanding it never gets
> destroyed.
>
> If there is any alternate way of dealing with this, please let me know.
>
> Thanks
>
> Mans
>
>


Re: Flink Control Stream

2019-04-24 Thread Till Rohrmann
Hi Dominik,

I think it is not possible to use Flink's AsyncFunction together with a
ConnectedStream or when you use BroadcastState. Therefore, it would be
necessary that you inject the control messages into your normal stream and
then filter them out in the AsyncFunction#asyncInvoke call.

Cheers,
Till

On Wed, Apr 24, 2019 at 11:05 AM Dominik Wosiński  wrote:

> Hey,
> I wanted to use the control stream to dynamically adjust parameters of the
> tasks. I know that it is possible to use *connect()* and *BroadcastState *to
> obtain such a thing. But I would like to have the possibility to control
> the parameters inside the *AsyncFunction. *Like specific timeout for HTTP
> client or the request address if it changes. I know that I could
> technically create join control stream and event stream into one stream and
> process it, but I was wondering If it would be possible to do this with any
> other mechanism?
>
> Thanks in advance,
> Best Regards.
> Dom.
>
>


Re: HA lock nodes, Checkpoints, and JobGraphs not being removed after failure

2019-04-24 Thread Till Rohrmann
Cross linking the dev ML thread [1]. Let us continue the discussion there.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/HA-lock-nodes-Checkpoints-and-JobGraphs-after-failure-td28432.html

Cheers,
Till

On Tue, Apr 23, 2019 at 9:52 AM dyana.rose  wrote:

> originally posted to the dev group, but it's a bit easy for things to get
> buried a bit there, and this may concern other HA users.
>
> Flink v1.7.1
>
> After a Flink reboot we've been seeing some unexpected issues with excess
> retained checkpoints not being able to be removed from ZooKeeper after a
> new checkpoint is created.
>
> I believe I've got my head around the role of ZK and lockNodes in
> Checkpointing after going through the code. Could you check my logic on
> this and add any insight, especially if I've got it wrong?
>
> The situation:
> 1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in HA
> with S3 as the backing store.
>
> 2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore has
> its own lockNode UUID. JM1 is elected leader.
>
> 3) We submit a job, that JobGraph lockNode is added to ZK using JM1's
> JobGraph lockNode.
>
> 4) Checkpoints start rolling in, latest 10 are retained in ZK using JM1's
> checkpoint lockNode. We continue running, and checkpoints are successfully
> being created and excess checkpoints removed.
>
> 5) Both JM1 and JM2 now are rebooted.
>
> 6) The JobGraph is recovered by the leader, the job restarts from the
> latest checkpoint.
>
> Now after every new checkpoint we see in the ZooKeeper logs:
> INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got
> user-level KeeperException when processing sessionid:0x1047715000d
> type:delete cxid:0x210 zxid:0x71091 txntype:-1 reqpath:n/a Error
> Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0057813
> Error:KeeperErrorCode = Directory not empty for
> /flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/005781
> with an increasing checkpoint id on each subsequent call.
>
> When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled,
> right? As the old checkpoints were created under the old UUID, the new JMs
> will never be able to remove the old retained checkpoints from ZooKeeper.
>
> Is that correct?
>
> If so, would this also happen with JobGraphs in the following situation
> (we saw this just recently where we had a JobGraph for a cancelled job
> still in ZK):
>
> Steps 1 through 3 above, then:
> 4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1
> restarts.
>
> 5) some time later while JM2 is still leader we hard cancel the job and
> restart the JMs
>
> In this case JM2 would successfully remove the job from s3, but because
> its lockNode is different from JM1 it cannot delete the lock file in the
> jobgraph folder and so can’t remove the jobgraph. Then Flink restarts and
> tries to process the JobGraph it has found, but the S3 files have been
> deleted.
>
> Possible related closed issues (fixes went in v1.7.0):
> https://issues.apache.org/jira/browse/FLINK-10184 and
> https://issues.apache.org/jira/browse/FLINK-10255
>
> Thanks for any insight,
> Dyana
>


Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-24 Thread M Singh
 Hi Rong:
Thanks for your answer.
>From what I understand the dynamic gap session windows are also created when 
>the event is encountered.  I need to be able to change the window end time at 
>a later time based on what other events are in that window.  One way to do 
>this is to use GlobalWindows but then these are never deleted.

Regarding CEP option - I believe that CEP patterns cannot be changed 
dynamically once they've been complied which limits it usage.
 Please feel free to correct me. 

Thanks for your help and pointers.

On Tuesday, April 23, 2019, 8:12:56 PM EDT, Rong Rong  
wrote:  
 
 Hi Mans,
I am not sure what you meant by "dynamically change the end-time of a window. 
If you are referring to dynamically determines the firing time of the window, 
then it fits into the description of session window [1]: If you want to handle 
window end time dynamically, one way of which I can think of is the dynamic 
gap, session window [1] approach. with which you can specify the end-time of a 
window based on input elements. Provided that you are maintaining a session 
window. Another way to look at it is through the Flink-CEP library [2]. 
Thanks,Rong

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows[2]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/cep.html#groups-of-patterns
On Tue, Apr 23, 2019 at 8:19 AM M Singh  wrote:

Hi:
I am working on a project and need to change the end time of the window 
dynamically.  I want to find out if the end time of the window is used 
internally (for sorting windows/etc) except for handling watermarks that would 
cause problems if the end time was changed during run time after the window has 
been created even if no new event has arrived for that window.

I don't want to use GlobalWindow since from my understanding it never gets 
destroyed.

If there is any alternate way of dealing with this, please let me know.

Thanks
Mans

  

Re: State Migration with RocksDB MapState

2019-04-24 Thread Cliff Resnick
Hi Gordon,

I noticed there has been no movement on this issue and I'm wondering if I
can find some way to work around this.
My MapState value is a case class container of Avro-generated
SpecificRecords. If one SpecificRecord changes I am stuck.

>From the issue It seems like the blocker is around evolving the MapState
key type.  That may be a nasty problem, but my key type is stable and will
never change. What do you think the level of difficulty would be to add
support for evolving only the value?

Also, if I use GenericRecord instead of SpecificRecord will the need for
schema evolution still be triggered? Or does it actually go down to the
avro schema rather than just the class serialVersionUID?






On Mon, Mar 18, 2019 at 1:10 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Cliff,
>
> Thanks for bringing this up!
> AFAIK, this wouldn't be a long-term blocker. I've just opened a JIRA to
> track this [1].
>
> As explained in the JIRA ticket, the main reason this is disallowed in the
> initial support for state schema evolution was due to how migration was
> implemented in the RocksDB state backend.
> Technically speaking, enabling this in the future is definitely possible.
>
> Cheers,
> Gordon
>
> [1]  https://issues.apache.org/jira/browse/FLINK-11947
>
> On Mon, Mar 18, 2019 at 11:20 AM Cliff Resnick  wrote:
>
>> After trying out state migration in 1.8 rc2 I ran into this hard stop
>> below. The comment does not give an indication why rocksdb map state cannot
>> be migrated, and I'm wondering what the status is, since we do need this
>> functionality and would like to know if this is a long-term blocker or not.
>> Anyone know?
>>
>>
>> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
>>
>


Re: May be useful: our reference document for "Understanding State in Flink"

2019-04-24 Thread Deepak Sharma
I want to volunteer for maintaining or adding to this kind of document.
Please do let me know if i can.

Thanks
Deepak

On Wed, Apr 24, 2019 at 6:33 AM Deepak Sharma  wrote:

>
>
> On Wed, Apr 24, 2019 at 5:14 AM Till Rohrmann 
> wrote:
>
>> Thanks for sharing this resource with the community Oytun. It looks
>> really helpful.
>>
>> I'm pulling in David and Fabian who work a lot on documentation. Maybe
>> it's interesting for them to take a look at. The community had once the
>> idea to set up a cook book with common Flink recipes but we never managed
>> to get it properly started.
>>
>> Cheers,
>> Till
>>
>> On Tue, Apr 23, 2019 at 5:54 PM Oytun Tez  wrote:
>>
>>> We keep a document with state-related use cases in our application,
>>> useful for onboarding new engineers in the application. See attached PDF.
>>>
>>> May be useful for others. And of course, corrections are welcome.
>>> (Couldn't share our Wiki page)
>>>
>>>
>>> ---
>>> Oytun Tez
>>>
>>> *M O T A W O R D*
>>> The World's Fastest Human Translation Platform.
>>> oy...@motaword.com — www.motaword.com
>>>
>>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: May be useful: our reference document for "Understanding State in Flink"

2019-04-24 Thread Deepak Sharma
On Wed, Apr 24, 2019 at 5:14 AM Till Rohrmann  wrote:

> Thanks for sharing this resource with the community Oytun. It looks really
> helpful.
>
> I'm pulling in David and Fabian who work a lot on documentation. Maybe
> it's interesting for them to take a look at. The community had once the
> idea to set up a cook book with common Flink recipes but we never managed
> to get it properly started.
>
> Cheers,
> Till
>
> On Tue, Apr 23, 2019 at 5:54 PM Oytun Tez  wrote:
>
>> We keep a document with state-related use cases in our application,
>> useful for onboarding new engineers in the application. See attached PDF.
>>
>> May be useful for others. And of course, corrections are welcome.
>> (Couldn't share our Wiki page)
>>
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>

-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: Flink 状态使用问题咨询

2019-04-24 Thread zhang yue
这种情况我需要改flink源码吗,还是自己实现一个自定义的state类就好了,还有在这个state类中怎么能获取到key呢

> 在 2019年4月17日,上午11:24,wenlong.lwl  写道:
> 
> 可以封装一下state 的访问,从state get不到数据的时候,去数据库里取下,更新到state里
> 
> On Tue, 16 Apr 2019 at 20:53, zhang yue  wrote:
> 
>> 是的,我希望从mysql加载初始的状态,因为我的kafka消息是从某个时间点开始的,在这个时间点之前的数据需要先加载到flink state
>> 那现在对于这种场景有什么替代方案吗
>> 
>>> 在 2019年4月16日,下午8:33,Congxian Qiu  写道:
>>> 
>>> Hi
>>> 如果你希望程序在刚开始运行的时候从外部存储加载数据,这个暂时做不到,不过现在社区正在做 Savepoint Reader/Writer
>> 相关的事情,到时候就可以了
>>> 
>>> Best, Congxian
>>> On Apr 16, 2019, 20:27 +0800, zhang yue , wrote:
 你好,我有一个keyed
>> state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
 
 
 class LineItemStat extends RichFlatMapFunction {
 
 /**
 * The ValueState handle. The first field is the count, the second field
>> a running sum.
 */
 private transient MapState stat_value;
 
 @Override
 public void flatMap(ObjectNode input, Collector out) throws
>> Exception {
 
 // access the state value
 
 }
 
 @Override
 public void open(Configuration config) {
 MapStateDescriptor descriptor =
 new MapStateDescriptor(
 "stat_value",String.class, Long.class); // default value of the state,
>> if nothing was set
 stat_value = getRuntimeContext().getMapState(descriptor);
 }
 }
 
>> 
>> 



Re: May be useful: our reference document for "Understanding State in Flink"

2019-04-24 Thread Till Rohrmann
Thanks for sharing this resource with the community Oytun. It looks really
helpful.

I'm pulling in David and Fabian who work a lot on documentation. Maybe it's
interesting for them to take a look at. The community had once the idea to
set up a cook book with common Flink recipes but we never managed to get it
properly started.

Cheers,
Till

On Tue, Apr 23, 2019 at 5:54 PM Oytun Tez  wrote:

> We keep a document with state-related use cases in our application, useful
> for onboarding new engineers in the application. See attached PDF.
>
> May be useful for others. And of course, corrections are welcome.
> (Couldn't share our Wiki page)
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>


Re: Fast restart of a job with a large state

2019-04-24 Thread Till Rohrmann
Hi Sergey,

at the moment neither local nor incremental savepoints are supported in
Flink afaik. There were some ideas wrt incremental savepoints floating
around in the community but nothing concrete yet.

Cheers,
Till

On Tue, Apr 23, 2019 at 6:58 PM Sergey Zhemzhitsky 
wrote:

> Hi Stefan, Paul,
>
> Thanks for the tips! Currently I have not tried neither rescaling from
> checkpoints nor task local recovery. Now it's a subject to test.
>
> In case it will be necessary not to just rescale a job, but also to change
> its DAG - is there a way to have something like let's call it "local
> savepoints" or "incremental savepoints" to prevent the whole state
> transferring to and from a distributed storage?
>
> Kind Regards,
> Sergey
>
>
> On Thu, Apr 18, 2019, 13:22 Stefan Richter 
> wrote:
>
>> Hi,
>>
>> If rescaling is the problem, let me clarify that you can currently
>> rescale from savepoints and all types of checkpoints (including
>> incremental). If that was the only problem, then there is nothing to worry
>> about - the documentation is only a bit conservative about this because we
>> will not commit to an APU that all future types checkpoints will be
>> resealable. But currently they are all, and this is also very unlikely to
>> change anytime soon.
>>
>> Paul, just to comment on your suggestion as well, local recovery would
>> only help with failover. 1) It does not help for restarts by the user and
>> 2) also does not work for rescaling (2) is a consequence of 1) because
>> failover never rescales, only restarts).
>>
>> Best,
>> Stefan
>>
>> On 18. Apr 2019, at 12:07, Paul Lam  wrote:
>>
>> The URL in my previous mail is wrong, and it should be:
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery
>>
>> Best,
>> Paul Lam
>>
>> 在 2019年4月18日,18:04,Paul Lam  写道:
>>
>> Hi,
>>
>> Have you tried task local recovery [1]?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>
>> Best,
>> Paul Lam
>>
>> 在 2019年4月17日,17:46,Sergey Zhemzhitsky  写道:
>>
>> Hi Flinkers,
>>
>> Operating different flink jobs I've discovered that job restarts with
>> a pretty large state (in my case this is up to 100GB+) take quite a
>> lot of time. For example, to restart a job (e.g. to update it) the
>> savepoint is created, and in case of savepoints all the state seems to
>> be pushed into the distributed store (hdfs in my case) when stopping a
>> job and pulling this state back when starting the new version of the
>> job.
>>
>> What I've found by the moment trying to speed up job restarts is:
>> - using external retained checkpoints [1]; the drawback is that the
>> job cannot be rescaled during restart
>> - using external state and storage with the stateless jobs; the
>> drawback is the necessity of additional network hops to this storage.
>>
>> So I'm wondering whether there are any best practices community knows
>> and uses to cope with the cases like this?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>
>>
>>
>>
>>


Re: Looking for help in configuring Swift as State Backend

2019-04-24 Thread Till Rohrmann
Hi Shakir,

have you checked out Flink's documentation for Filesystems [1]? What is the
problem you are observing?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems.html

Cheers,
Till

On Tue, Apr 23, 2019 at 9:30 PM PoolakkalMukkath, Shakir <
shakir_poolakkalmukk...@comcast.com> wrote:

> Hi,
>
>
>
> I am looking for some help in configuring the Swift Fs as State Backend. I
> am unable to configure it, let me know if anyone has prior done this or
> knowledge to help me
>
> Do we still need to run an HDFS to use this feature ?
>
>
>
> Thanks,
>
> Shakir
>


Re: No zero ( 2 ) exit code on k8s StandaloneJobClusterEntryPoint when save point with cancel...

2019-04-24 Thread Till Rohrmann
Hi Vishal,

it seems that the following is happening: You triggered the cancel with
savepoint command from via the REST call. This command is an asynchronous
operation which produces a result (the savepoint path). In order to deliver
asynchronous results to the caller, Flink waits before shutting down until
they are delivered or until it times out after 5 minutes. I assume that you
don't request the savepoint path from Flink via the returned URL from the
original request. This could either happen if you kill the CLI before its
done or if you have written your own method to trigger this operation.

I guess we could add a flag for asynchronous operations which tells Flink
that their results don't need to get delivered to some client. If you would
like to have such a feature, then please open a JIRA issue for it.

Cheers,
Till

On Wed, Apr 24, 2019 at 3:49 AM Vishal Santoshi 
wrote:

> Anyione ?
>
>
>
> I think there some race condition .  These are the TM logs.. I am puzzled
> b'coz in a larger pipe ( there are about 32 lots on 8 replicas and it works
>
>
>
>
> 2019-04-24 01:16:20,889 DEBUG
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
> Releasing local state under allocation id 5a853ef886e1c599f86b9503306fffd2.
>
> 2019-04-24 01:16:20,894 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
> JobManager connection for job .
>
> org.apache.flink.util.FlinkException: Stopping JobMaster for job
> EventCountJob().
>
> at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:355)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> 2019-04-24 01:16:20,895 INFO
> org.apache.flink.runtime.taskexecutor.JobLeaderService- Cannot
> reconnect to job  because it is not
> registered.
>
> 2019-04-24 01:16:21,053 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from e61c2b7d992f151936e21db1ca0d.
>
> 2019-04-24 01:16:22,136 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> ping response for sessionid: 0x25add5478fb2ec6 after 0ms
>
> 2019-04-24 01:16:31,052 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from e61c2b7d992f151936e21db1ca0d.
>
> 2019-04-24 01:16:35,483 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> ping response for sessionid: 0x25add5478fb2ec6 after 0ms
>
> On Tue, Apr 23, 2019 at 3:11 PM Vishal Santoshi 
> wrote:
>
>> I see this in the TM pod
>>
>> 2019-04-23 19:08:41,828 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> ping response for sessionid: 0x15cc7f3d88466a5 after 0ms
>>
>> 2019-04-23 19:08:47,543 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
>> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>>
>> 2019-04-23 19:08:55,175 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> ping response for sessionid: 0x15cc7f3d88466a5 after 1ms
>>
>> 2019-04-23 19:08:57,548 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
>> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>>
>> 2019-04-23 19:09:07,543 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
>> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>>
>> 2019-04-23 19:09:08,523 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> ping response for sessionid: 0x15cc7f3d88466a5 after 0ms
>>
>> 2019-04-23 19:09:17,542 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
>> heartbeat request from 

Re: metric does not display on web

2019-04-24 Thread Chesnay Schepler

"No metrics available" indicates that either
a) metrics have not been queried yet or
b) metrics can not be transferred from the TaskManagers to the JobManager.

Regarding the first option, how long have you waited for metrics to show 
up? It may take a bit for metrics to be available (around 10 seconds).


As for the second, you'll have to set the log level to DEBUG and search 
for metric related errors.


On 19/04/2019 08:23, sora wrote:

Hi all,
I am trying to monitor my flink application, so I add two metric in my 
application.
But I can not see any information on the web. The metric tab says "No 
metrics available".

Do I miss any config?
My flink version: 1.7.2
My example code:
def main(args: Array[String]) {

   val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val text = env.socketTextStream("HOST", )
   val counts = text.flatMap(new RichFlatMapFunction[String, String] {
 private var counter: Counter = _
 private var meter: org.apache.flink.metrics.Meter = _

 override def open(parameters: Configuration):Unit = {
   super.open(parameters)
   counter = getRuntimeContext.getMetricGroup.counter("recordCounter")
   meter = getRuntimeContext.getMetricGroup.meter("recordMeter", new 
DropwizardMeterWrapper(new com.codahale.metrics.Meter()))
 }

 override def flatMap(value:String, out: Collector[String]):Unit = {
   val result = value.split("\\W+").filter(_.nonEmpty)
   result.foreach(out.collect)
   counter.inc(result.length)
   meter.markEvent(result.length)
 }
   })
 .map {
   (_, 1L)
 }
 .keyBy(_._1)
 .timeWindow(Time.seconds(5))
 .sum(1)
 .map {
   _.toString()
 }

   counts.addSink(new SocketClientSink[String]("HOST", , new 
SimpleStringSchema))

   env.execute("Scala SocketTextStreamWordCount Example")
}





Flink Control Stream

2019-04-24 Thread Dominik Wosiński
Hey,
I wanted to use the control stream to dynamically adjust parameters of the
tasks. I know that it is possible to use *connect()* and *BroadcastState *to
obtain such a thing. But I would like to have the possibility to control
the parameters inside the *AsyncFunction. *Like specific timeout for HTTP
client or the request address if it changes. I know that I could
technically create join control stream and event stream into one stream and
process it, but I was wondering If it would be possible to do this with any
other mechanism?

Thanks in advance,
Best Regards.
Dom.


Re: Error restoring from checkpoint on Flink 1.8

2019-04-24 Thread Till Rohrmann
For future reference here is a cross link to the referred ML thread
discussion [1].

[1]
http://mail-archives.apache.org/mod_mbox/flink-user/201904.mbox/%3cm2ef5tpfwy.wl-nings...@gmail.com%3E

Cheers,
Till

On Wed, Apr 24, 2019 at 4:00 AM Ning Shi  wrote:

> Hi Congxian,
>
> I think I have figured out the issue. It's related to the checkpoint
> directory
> collision issue you responded to in the other thread. We reproduced this
> bug on
> 1.6.1 after unchaining the operators.
>
> There are two stateful operators in the chain, one is a
> CoBroadcastWithKeyedOperator, the other one is a StreamMapper. The
> CoBroadcastWithKeyedOperator creates timer states in RocksDB, the latter
> doesn’t. Because of the checkpoint directory collision bug, we always end
> up
> saving the states for CoBroadcastWithKeyedOperator.
>
> After breaking these two operators apart, they try to restore from the
> same set
> of saved states. When the StreamMapper opens the RocksDB files, it doesn’t
> care
> about any of the column families in there, including the timer states.
> Hence the
> error.
>
> --
> Ning
>


Re: get custom gauge metric from WebMonitorEndpoint

2019-04-24 Thread Chesnay Schepler
You should be able to get the value of your custom metric. You'll have 
to set the log level to DEBUG and scan the logs for metric-related errors.


On 23/04/2019 22:25, Georgi Stoyanov wrote:


I've got custom metric ->

||

And I'm using them as suggested in the documentation ->

||
|
|getRuntimeContext().getMetricGroup().gauge("MyCustomMetric", new 
TestMetric());|||

|

I want to get this metric with GET method, but so far I tried almost 
everything in the API documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html 
) but didn't find that metric. Do you know how (or even could I) get 
that custom metric via API?




Kind Regards,
Georgi






Re: Missing state in RocksDB checkpoints

2019-04-24 Thread Till Rohrmann
Thanks for reporting this issue Ning. I think this is actually a blocker
for the next release and should be fixed right away. For future reference
here is the issue [1].

I've also pulled in Stefan who knows these components very well.

[1] https://issues.apache.org/jira/browse/FLINK-12296

Cheers,
Till

On Tue, Apr 23, 2019 at 5:24 PM Ning Shi  wrote:

> On Tue, 23 Apr 2019 10:53:52 -0400,
> Congxian Qiu wrote:
> > Sorry for the misleading, in the previous email, I just want to say the
> problem is not caused by the UUID generation, it is caused by the different
> operators share the same directory(because currentlyFlink uses JobVertx as
> the directory)
>
> Ah, thank you for the clarification, Congxian. That makes sense.
>
> Ning
>


Flink Stream SQL group by TUMBLE(rowtime,)

2019-04-24 Thread 邵志鹏
大家好,问题求助:


事件时间使用EventTime,默认Checkpoint【没有offset从earliest开始,有则从offset开始】。


assignTimestampsAndWatermarks,1、不使用Watermark,即默认eventtime时间戳;2、使用官方max取最大值;两种情况问题基本相同:
问题描述:
比如,发送40条数据,窗口消费33条。另外7条,需要继续发送新的数据,才会被消费掉,即便重启程序-也要发送新的数据,才会消费上次"未及时"消费的数据,而不是自动从上一次的offset+1开始。


SQL:
SELECT astyle, TUMBLE_START(rowtime, INTERVAL '10' SECOND) time_start, 
TUMBLE_END(rowtime, INTERVAL '10' SECOND) time_end, SUM(energy) AS sum_energy, 
CAST(COUNT(aid) AS INT) AS cnt, CAST(AVG(age) AS INT) AS avg_age FROM t_pojo 
GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), astyle


assignTimestampsAndWatermarks,3、使用System.currentTimeMillis()作为Watermark,消息消费是及时了,但是又有新的问题:
即程序启动后,相当于从latest开始消费消息,不能消费未消费过的offset【即使换掉group.id消费者组也是如此】。


https://stackoverflow.com/questions/55499764/how-to-let-flink-flush-last-line-to-sink-when-producerkafka-does-not-produce-n
https://stackoverflow.com/questions/55779711/why-not-on-time-when-i-consumed-kafka-message-using-flink-streaming-sql-group-by


另外,UTC时区的问题,目前是extractTimestamp的时候,+2880。很想知道大神们的解决方法是什么,或者就是保存UTC时间,然后结果数据对外的时候再做处理?





Re: How to implement custom stream operator over a window? And after the Count-Min Sketch?

2019-04-24 Thread Felipe Gutierrez
Hi Rong,

thanks for your reply. I guess I already did something regarding what you
have told to me. I have one example on this application [1], which uses
this state [2]  and computes a CountMinSketch [3].

I am seeking how to implement my own operator over a window in order to
have more fine-grained control over it and learn with it. And hopefully,
building a path to contribute to Flink in the future [4].

[1]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L69
[2]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L182
[3]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/util/CountMinSketch.java
[4] https://issues.apache.org/jira/browse/FLINK-2147

Best,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Wed, Apr 24, 2019 at 2:06 AM Rong Rong  wrote:

> Hi Felipe,
>
> In a short glance, the question can depend on how your window is (is there
> any overlap like sliding window) and how many data you would like to
> process.
>
> In general, you can always buffer all the data into a ListState and apply
> your window function by iterating through all those buffered elements [1].
> Provided that the data size is small enough to be hold efficiently in the
> state-backend.
> If this algorithm has some sort of pre-aggregation that can simplify the
> buffering through an incremental, orderless aggregation, you can also think
> about using [2].
> With these two approaches, you do not necessarily need to implement your
> own window operator (extending window operator can be tricky), and you also
> have access to the internal state [3].
>
> Hope these helps your exploration.
>
> Thanks,
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>
> On Tue, Apr 23, 2019 at 8:16 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi,
>>
>> I want to implement my own operator that computes the Count-Min Sketch
>> over a window in Flink. Then, I found this Jira issue [1]
>>  which is exactly what
>> I want. I believe that I have to work out my skills to arrive at a mature
>> solution.
>>
>> So, the first thing that comes to my mind is to create my custom operator
>> like the AggregateApplyWindowFunction [2]
>> .
>> Through this I can create the summary of my data over a window.
>>
>> Also, I found this custom JoinOperator example by Till Rohrmann [3]
>>  which I think I can base
>> my implementation since it is done over a DataStream.
>>
>> What are your suggestions to me in order to start to implement a custom
>> stream operator which computes Count-Min Sketch? Do you have any custom
>> operator over window/keyBy that I can learn with the source code?
>>
>> ps.: I have implemented (looking at Blink source code) this a custom
>> Combiner [4]
>> 
>> (map-combiner-reduce) operator.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-2147
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.html
>> [3] https://github.com/tillrohrmann/custom-join
>> [4]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operator/AbstractRichMapStreamBundleOperator.java
>>
>> Thanks,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>


Re: [DISCUSS] Temporarily remove support for job rescaling via CLI action "modify"

2019-04-24 Thread Till Rohrmann
+1 for temporarily removing support for the modify command.

Eventually, we have to add it again in order to support auto scaling. The
next time we add it, we should address the known limitations.

Cheers,
Till

On Wed, Apr 24, 2019 at 9:06 AM Paul Lam  wrote:

> Hi Gary,
>
> + 1 to remove it for now. Actually some users are not aware of that it’s
> still experimental, and ask quite a lot about the problem it causes.
>
> Best,
> Paul Lam
>
> 在 2019年4月24日,14:49,Stephan Ewen  写道:
>
> Sounds reasonable to me. If it is a broken feature, then there is not much
> value in it.
>
> On Tue, Apr 23, 2019 at 7:50 PM Gary Yao  wrote:
>
> Hi all,
>
> As the subject states, I am proposing to temporarily remove support for
> changing the parallelism of a job via the following syntax [1]:
>
>./bin/flink modify [job-id] -p [new-parallelism]
>
> This is an experimental feature that we introduced with the first rollout
> of
> FLIP-6 (Flink 1.5). However, this feature comes with a few caveats:
>
>* Rescaling does not work with HA enabled [2]
>* New parallelism is not persisted, i.e., after a JobManager restart,
> the job
>  will be recovered with the initial parallelism
>
> Due to the above-mentioned issues, I believe that currently nobody uses
> "modify -p" to rescale their jobs in production. Moreover, the rescaling
> feature stands in the way of our current efforts to rework Flink's
> scheduling
> [3]. I therefore propose to remove the rescaling code for the time being.
> Note
> that it will still be possible to change the parallelism by taking a
> savepoint
> and restoring the job with a different parallelism [4].
>
> Any comments and suggestions will be highly appreciated.
>
> Best,
> Gary
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
> [2] https://issues.apache.org/jira/browse/FLINK-8902
> [3] https://issues.apache.org/jira/browse/FLINK-10429
> [4]
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html#what-happens-when-i-change-the-parallelism-of-my-program-when-restoring
>
>
>


Re: [DISCUSS] Temporarily remove support for job rescaling via CLI action "modify"

2019-04-24 Thread Paul Lam
Hi Gary,

+ 1 to remove it for now. Actually some users are not aware of that it’s still 
experimental, and ask quite a lot about the problem it causes. 

Best,
Paul Lam

> 在 2019年4月24日,14:49,Stephan Ewen  写道:
> 
> Sounds reasonable to me. If it is a broken feature, then there is not much
> value in it.
> 
> On Tue, Apr 23, 2019 at 7:50 PM Gary Yao  wrote:
> 
>> Hi all,
>> 
>> As the subject states, I am proposing to temporarily remove support for
>> changing the parallelism of a job via the following syntax [1]:
>> 
>>./bin/flink modify [job-id] -p [new-parallelism]
>> 
>> This is an experimental feature that we introduced with the first rollout
>> of
>> FLIP-6 (Flink 1.5). However, this feature comes with a few caveats:
>> 
>>* Rescaling does not work with HA enabled [2]
>>* New parallelism is not persisted, i.e., after a JobManager restart,
>> the job
>>  will be recovered with the initial parallelism
>> 
>> Due to the above-mentioned issues, I believe that currently nobody uses
>> "modify -p" to rescale their jobs in production. Moreover, the rescaling
>> feature stands in the way of our current efforts to rework Flink's
>> scheduling
>> [3]. I therefore propose to remove the rescaling code for the time being.
>> Note
>> that it will still be possible to change the parallelism by taking a
>> savepoint
>> and restoring the job with a different parallelism [4].
>> 
>> Any comments and suggestions will be highly appreciated.
>> 
>> Best,
>> Gary
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
>> [2] https://issues.apache.org/jira/browse/FLINK-8902
>> [3] https://issues.apache.org/jira/browse/FLINK-10429
>> [4]
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html#what-happens-when-i-change-the-parallelism-of-my-program-when-restoring
>> 



Re: [DISCUSS] Temporarily remove support for job rescaling via CLI action "modify"

2019-04-24 Thread Stephan Ewen
Sounds reasonable to me. If it is a broken feature, then there is not much
value in it.

On Tue, Apr 23, 2019 at 7:50 PM Gary Yao  wrote:

> Hi all,
>
> As the subject states, I am proposing to temporarily remove support for
> changing the parallelism of a job via the following syntax [1]:
>
> ./bin/flink modify [job-id] -p [new-parallelism]
>
> This is an experimental feature that we introduced with the first rollout
> of
> FLIP-6 (Flink 1.5). However, this feature comes with a few caveats:
>
> * Rescaling does not work with HA enabled [2]
> * New parallelism is not persisted, i.e., after a JobManager restart,
> the job
>   will be recovered with the initial parallelism
>
> Due to the above-mentioned issues, I believe that currently nobody uses
> "modify -p" to rescale their jobs in production. Moreover, the rescaling
> feature stands in the way of our current efforts to rework Flink's
> scheduling
> [3]. I therefore propose to remove the rescaling code for the time being.
> Note
> that it will still be possible to change the parallelism by taking a
> savepoint
> and restoring the job with a different parallelism [4].
>
> Any comments and suggestions will be highly appreciated.
>
> Best,
> Gary
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
> [2] https://issues.apache.org/jira/browse/FLINK-8902
> [3] https://issues.apache.org/jira/browse/FLINK-10429
> [4]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html#what-happens-when-i-change-the-parallelism-of-my-program-when-restoring
>


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-24 Thread Jeff Zhang
Hi Till,

IMHO, allow adding hooks involves 2 steps.
1. Provide hook interface, and call these hook in flink (ClusterClient) at
the right place. This should be done by framework (flink)
2. Implement new hook implementation and add/register them into
framework(flink)

What I am doing is step 1 which should be done by flink, step 2 is done by
users. But IIUC, your suggestion of using custom ClusterClient seems mixing
these 2 steps together. Say I'd like to add new hooks, I have to implement
a new custom ClusterClient, add new hooks and call them in the custom
ClusterClient at the right place.
This doesn't make sense to me. For a user who want to add hooks, he is not
supposed to understand the mechanism of ClusterClient, and should not touch
ClusterClient. What do you think ?




Till Rohrmann  于2019年4月23日周二 下午4:24写道:

> I think we should not expose the ClusterClient configuration via the
> ExecutionEnvironment (env.getClusterClient().addJobListener) because this
> is effectively the same as exposing the JobListener interface directly on
> the ExecutionEnvironment. Instead I think it could be possible to provide a
> ClusterClient factory which is picked up from the Configuration or some
> other mechanism for example. That way it would not need to be exposed via
> the ExecutionEnvironment at all.
>
> Cheers,
> Till
>
> On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:
>
>> >>>  The ExecutionEnvironment is usually used by the user who writes the
>> code and this person (I assume) would not be really interested in these
>> callbacks.
>>
>> Usually ExecutionEnvironment is used by the user who write the code, but
>> it doesn't needs to be created and configured by this person. e.g. in
>> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
>> use ExecutionEnvironment to write flink program.  You are right that the
>> end user would not be interested in these callback, but the third party
>> library that integrate with zeppelin would be interested in these callbacks.
>>
>> >>> In your case, it could be sufficient to offer some hooks for the
>> ClusterClient or being able to provide a custom ClusterClient.
>>
>> Actually in my initial PR (https://github.com/apache/flink/pull/8190), I
>> do pass JobListener to ClusterClient and invoke it there.
>> But IMHO, ClusterClient is not supposed be a public api for users.
>> Instead JobClient is the public api that user should use to control job. So
>> adding hooks to ClusterClient directly and provide a custom ClusterClient
>> doesn't make sense to me. IIUC, you are suggesting the following approach
>>  env.getClusterClient().addJobListener(jobListener)
>> but I don't see its benefit compared to this.
>>  env.addJobListener(jobListener)
>>
>> Overall, I think adding hooks is orthogonal with fine grained job
>> control. And I agree that we should refactor the flink client component,
>> but I don't think it would affect the JobListener interface. What do you
>> think ?
>>
>>
>>
>>
>> Till Rohrmann  于2019年4月18日周四 下午8:57写道:
>>
>>> Thanks for starting this discussion Jeff. I can see the need for
>>> additional hooks for third party integrations.
>>>
>>> The thing I'm wondering is whether we really need/want to expose a
>>> JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
>>> usually used by the user who writes the code and this person (I assume)
>>> would not be really interested in these callbacks. If he would, then one
>>> should rather think about a better programmatic job control where the
>>> `ExecutionEnvironment#execute` call returns a `JobClient` instance.
>>> Moreover, we would effectively make this part of the public API and every
>>> implementation would need to offer it.
>>>
>>> In your case, it could be sufficient to offer some hooks for the
>>> ClusterClient or being able to provide a custom ClusterClient. The
>>> ClusterClient is the component responsible for the job submission and
>>> retrieval of the job result and, hence, would be able to signal when a job
>>> has been submitted or completed.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:
>>>
 Hi Jeff,

 I personally like this proposal. From the perspective of
 programmability, the JobListener can make the third program more
 appreciable.

 The scene where I need the listener is the Flink cube engine for Apache
 Kylin. In the case, the Flink job program is embedded into the Kylin's
 executable context.

 If we could have this listener, it would be easier to integrate with
 Kylin.

 Best,
 Vino

 Jeff Zhang  于2019年4月18日周四 下午1:30写道:

>
> Hi All,
>
> I created FLINK-12214
>  for adding
> JobListener (hook) in flink job lifecycle. Since this is a new public api
> for flink, so I'd like to discuss it more widely in community to get more
> feedback.
>
> The background