Re: Jobmanager not properly fenced when killed by YARN RM

2019-12-16 Thread Yang Wang
Hi Paul,

Thanks for sharing your analysis. I think you are right. When the Yarn
NodeManager crashed,
the first jobmanager running on it will not be killed. However, the Yarn
ResourceManager found
the NodeManager lost, it launched a new jobmanager attempt. Before
FLINK-14010 , only the
FlinkResourceManager will exit when received onShutdownRequest from Yarn
ResourceManager.
So Dispatcher and JobManager launched in new AM cannot be granted
leadership properly and
will restart repeatedly.



Best,
Yang

Paul Lam  于2019年12月17日周二 下午12:35写道:

> Hi Yang,
>
> Thanks a lot for your reasoning. You are right about the YARN cluster. The
> NodeManager was crashed, and that’s why RM would kill the containers on
> that machine, after a heartbeat timeout (about 10 min) with the NodeManager.
>
> Actually the attached logs are from the first/old jobmanager, and I
> couldn’t the log about YARN application unregistration in all logs. I think
> maybe Flink resource manager was not trying to unregister the application
> (which would also remove the HA service state) when it got a shutdown
> request, because the Flink job runs well at the moment.
>
> I dug a bit deeper to find that the root cause might be that Flink
> ResoureManager was taking too long to shutdown and it didn’t change the
> Flink job status (so JobManager would keep working even the AM is killed by
> RM). And also I’ve found the related issue mentioned in my previous mail.
> [1]
>
> Thanks a lot for you help!
>
> [1] https://issues.apache.org/jira/browse/FLINK-14010
>
> Best,
> Paul Lam
>
> 在 2019年12月16日,20:35,Yang Wang  写道:
>
> Hi Paul,
>
> I found lots of "Failed to stop Container " logs in the jobmanager.log. It
> seems that the Yarn cluster
> is not working normally. So the Flink YarnResourceManager may also
> unregister app failed. If we
> unregister app successfully, no new attempt will be started.
>
> The second and following jobmanager attempt started and failed because the
> staging directory
> on HDFS was cleaned up. So could you find the log "Could not unregister
> the application master"
> in all the jobmanager logs, including the first one?
>
>
>
> Best,
> Yang
>
> Paul Lam  于2019年12月16日周一 下午7:56写道:
>
>> Hi Yang,
>>
>> Thanks a lot for your reply!
>>
>> I might not make myself clear, but the new jobmanager in the new YARN
>> application attempt did start successfully. And unluckily, I didn’t find
>> any logs written by YarnResourceManager in the jobmanager logs.
>>
>> The jobmanager logs are in the attachment (with some subtask status
>> change logs removed and env info censored).
>>
>> Thanks!
>>
>>
>> Best,
>> Paul Lam
>>
>> 在 2019年12月16日,14:42,Yang Wang  写道:
>>
>> Hi Paul,
>>
>> I have gone through the codes and found that the root cause may be
>> `YarnResourceManager` cleaned
>> up the application staging directory. When it unregisters from the Yarn
>> ResourceManager failed, a
>> new attempt will be launched and failed quickly because of localization
>> failed.
>>
>> I think it is a bug, and will happen when unregister application failed.
>> Could you share some logs of
>> jobmanager, the second or following attempt so that i could confirm the
>> bug?
>>
>>
>> Best,
>> Yang
>>
>> Paul Lam  于2019年12月14日周六 上午11:02写道:
>>
>>> Hi,
>>>
>>> Recently I've seen a situation when a JobManager received a stop signal
>>> from YARN RM but failed to exit and got in the restart loop, and keeps
>>> failing because the TaskManager containers are disconnected (killed by RM
>>> as well) before finally exited when hit the limit of the restart policy.
>>> This further resulted in the flink job being marked as final status failed
>>> and cleanup of  zookeeper paths, so when a new JobManager started up it
>>> found no checkpoint to restore and performed a stateless restart. In
>>> addition, the application is run with Flink 1.7.1 in HA job cluster mode on
>>> Hadoop 2.6.5.
>>>
>>> As I can remember, I've seen a similar issue that relates to the fencing
>>> of JobManager, but I searched the JIRA and couldn't find it. It would be
>>> great if someone can point me to the right direction. And any comments are
>>> also welcome! Thanks!
>>>
>>> Best,
>>> Paul Lam
>>>
>>
>>
>


Re: Questions about taskmanager.memory.off-heap and taskmanager.memory.preallocate

2019-12-16 Thread Xintong Song
Glad that helped. I'm also posting this conversation to the public mailing
list, in case other people have similar questions.

And regarding the GC statement, I think the document is correct.
- Flink Memory Manager guarantees that the amount of allocated managed
memory never exceed the configured capacity, thus managed memory allocation
should not trigger OOM.
- When preallocation is enabled, managed memory segments are allocated and
pooled by Flink Memory Manager, no matter there are tasks requesting them
or not. The segments will not be deallocated until the cluster is shutdown.
- When preallocation is disabled, managed memory segments are allocated
only when tasks requesting them, and destroyed immediately when tasks
return them to the Memory Manager. However, what this statement trying to
say is that, the memory is not deallocated directly when the memory segment
is destroyed, but will have to wait until the GC to be truly released.

Thank you~

Xintong Song



On Tue, Dec 17, 2019 at 12:30 PM Ethan Li  wrote:

> Thank you very much Xintong! It’s much clear to me now.
>
> I am still on standalone cluster setup.  Before I was using 350GB on-heap
> memory on a 378GB box. I saw a lot of swap activities. Now I understand
> that it’s because RocksDB didn’t have enough memory to use, so OS forces
> JVM to swap. It can explain why the cluster was not stable and kept
> crashing.
>
> Now that I put 150GB off-heap and 150GB on-heap, the cluster is more
> stable than before. I thought it was because GC was reduced because now we
> have less heap memory. Now I understand that it’s because I have 78GB
> memory available for rocksDB to use, 50GB more than before. And it explains
> why I don’t see swaps anymore.
>
> This makes sense to me now. I just have to set preallocation to false to
> use the other 150 GB off-heap memory for rocksDB and do some tuning on
> these memory configs.
>
>
> One thing I noticed is that in
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#taskmanager-memory-preallocate
>
>  If this configuration is set to false cleaning up of the allocated
> off-heap memory happens only when the configured JVM parameter
> MaxDirectMemorySize is reached by triggering a full GC
>
> I think this statement is not correct. GC is not trigged by reaching
> MaxDirectMemorySize. It will throw "java.lang.OutOfMemoryError: Direct
> buffer memory” if MaxDirectMemorySize is reached.
>
> Thank you again for your help!
>
> Best,
> Ethan
>
>
> On Dec 16, 2019, at 9:44 PM, Xintong Song  wrote:
>
> Hi Ethan,
>
> When you say "it's doing better than before", what is your setups before?
> Is it on-heap managed memory? With preallocation enabled or disabled? Also,
> what deployment (standalone, yarn, or local executor) do you run Flink on?
> It's hard to tell why the performance becomes better without knowing the
> information above.
>
> Since you are using RocksDB, and configure managed memory to off-heap, you
> should set pre-allocation to false. Steaming job with RocksDB state backend
> does not use managed memory at all. Setting managed memory to off-heap only
> makes Flink to launch JVM with smaller heap space, leaving more space
> outside JVM. Setting pre-allocation to false makes Flink allocate those
> managed memory on-demand, and since there's no demand the managed memory
> will not be allocated. Therefore, the memory space left outside JVM can be
> fully leveraged by RocksDB.
>
> Regarding related source codes, I would recommend the following:
> - MemoryManager - For how managed memory is allocated / used. Related to
> pre-allocation.
> - ContaineredTaskManagerParameters - For how the JVM memory parameters are
> decided. Related to on-heap / off-heap managed memory.
> - TaskManagerServices#fromConfiguration - For how different components are
> created, as well as how their memory sizes are decided. Also related to
> on-heap / off-heap managed memory.
>
> Thank you~
> Xintong Song
>
>
>
> On Tue, Dec 17, 2019 at 11:00 AM Ethan Li 
> wrote:
>
>> Thank you Xintong, Vino for taking your time answering my question. I
>> didn’t know managed memory is only for batch jobs.
>>
>>
>>
>> I tried to set to use off-heap Flink managed memory (with preallocation
>> to true) and it’s doing better than before. It would not make sense if
>> managed memory is not used. I was confused. Then I found this doc
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
>> 
>>
>> Configuring an off-heap state backend like RocksDB means either also
>> setting managed memory to off-heap or adjusting the cutoff ratio, to
>> dedicate less memory to the JVM heap.
>>
>>
>> We use RocksDB too so I guess I was doing that correctly by accident. So
>> the question here is, in this case, should we set preallocate to true or
>> false?
>>
>> If set to true, TM will 

Re: Jobmanager not properly fenced when killed by YARN RM

2019-12-16 Thread Paul Lam
Hi Yang,

Thanks a lot for your reasoning. You are right about the YARN cluster. The 
NodeManager was crashed, and that’s why RM would kill the containers on that 
machine, after a heartbeat timeout (about 10 min) with the NodeManager.

Actually the attached logs are from the first/old jobmanager, and I couldn’t 
the log about YARN application unregistration in all logs. I think maybe Flink 
resource manager was not trying to unregister the application (which would also 
remove the HA service state) when it got a shutdown request, because the Flink 
job runs well at the moment. 

I dug a bit deeper to find that the root cause might be that Flink 
ResoureManager was taking too long to shutdown and it didn’t change the Flink 
job status (so JobManager would keep working even the AM is killed by RM). And 
also I’ve found the related issue mentioned in my previous mail. [1]

Thanks a lot for you help!

[1] https://issues.apache.org/jira/browse/FLINK-14010 


Best,
Paul Lam

> 在 2019年12月16日,20:35,Yang Wang  写道:
> 
> Hi Paul,
> 
> I found lots of "Failed to stop Container " logs in the jobmanager.log. It 
> seems that the Yarn cluster
> is not working normally. So the Flink YarnResourceManager may also unregister 
> app failed. If we
> unregister app successfully, no new attempt will be started.
> 
> The second and following jobmanager attempt started and failed because the 
> staging directory
> on HDFS was cleaned up. So could you find the log "Could not unregister the 
> application master"
> in all the jobmanager logs, including the first one?
> 
> 
> 
> Best,
> Yang
> 
> Paul Lam mailto:paullin3...@gmail.com>> 
> 于2019年12月16日周一 下午7:56写道:
> Hi Yang,
> 
> Thanks a lot for your reply!
> 
> I might not make myself clear, but the new jobmanager in the new YARN 
> application attempt did start successfully. And unluckily, I didn’t find any 
> logs written by YarnResourceManager in the jobmanager logs.  
> 
> The jobmanager logs are in the attachment (with some subtask status change 
> logs removed and env info censored).
> 
> Thanks!
> 
> 
> Best,
> Paul Lam
> 
>> 在 2019年12月16日,14:42,Yang Wang > > 写道:
>> 
>> Hi Paul,
>> 
>> I have gone through the codes and found that the root cause may be 
>> `YarnResourceManager` cleaned
>> up the application staging directory. When it unregisters from the Yarn 
>> ResourceManager failed, a
>> new attempt will be launched and failed quickly because of localization 
>> failed.
>> 
>> I think it is a bug, and will happen when unregister application failed. 
>> Could you share some logs of 
>> jobmanager, the second or following attempt so that i could confirm the bug?
>> 
>> 
>> Best,
>> Yang
>> 
>> Paul Lam mailto:paullin3...@gmail.com>> 
>> 于2019年12月14日周六 上午11:02写道:
>> Hi,
>> 
>> Recently I've seen a situation when a JobManager received a stop signal from 
>> YARN RM but failed to exit and got in the restart loop, and keeps failing 
>> because the TaskManager containers are disconnected (killed by RM as well) 
>> before finally exited when hit the limit of the restart policy. This further 
>> resulted in the flink job being marked as final status failed and cleanup of 
>>  zookeeper paths, so when a new JobManager started up it found no checkpoint 
>> to restore and performed a stateless restart. In addition, the application 
>> is run with Flink 1.7.1 in HA job cluster mode on Hadoop 2.6.5.
>> 
>> As I can remember, I've seen a similar issue that relates to the fencing of 
>> JobManager, but I searched the JIRA and couldn't find it. It would be great 
>> if someone can point me to the right direction. And any comments are also 
>> welcome! Thanks!
>> 
>> Best,
>> Paul Lam
> 



Re:flink 1.9 conflict jackson version

2019-12-16 Thread ouywl






Hi Bu    I think It can use mvn-shade-plugin to resolve your problem,  It seem flink-client conflict with your owner jar?






  










ouywl




ou...@139.com








签名由
网易邮箱大师
定制

 


On 12/17/2019 08:10,Fanbin Bu wrote: 


Hi,After I upgrade flink 1.9, I got the following error message on EMR, it works locally on IntelliJ.I'm explicitly declaring the dependency as implementation 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'and I haveimplementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.11.595'java.lang.NoSuchMethodError: com.fasterxml.jackson.databind.ObjectMapper.enable([Lcom/fasterxml/jackson/core/JsonParser$Feature;)Lcom/fasterxml/jackson/databind/ObjectMapper;
	at com.amazonaws.partitions.PartitionsLoader.(PartitionsLoader.java:54)
	at com.amazonaws.regions.RegionMetadataFactory.create(RegionMetadataFactory.java:30)
	at com.amazonaws.regions.RegionUtils.initialize(RegionUtils.java:65)
	at com.amazonaws.regions.RegionUtils.getRegionMetadata(RegionUtils.java:53)
	at com.amazonaws.regions.RegionUtils.getRegion(RegionUtils.java:107)
	at com.amazonaws.client.builder.AwsClientBuilder.getRegionObject(AwsClientBuilder.java:256)
	at com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:460)
	at com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:424)
	at com.amazonaws.client.builder.AwsAsyncClientBuilder.build(AwsAsyncClientBuilder.java:80)
	at com.coinbase.util.KmsClient$.getSnowflakeUsernamePassword(KmsClient.scala:21)
	at com.coinbase.ml.RunFlinkJob$.runBatch(RunFlinkJob.scala:94)
	at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:38)
	at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
	at com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)






Re: Fw: Metrics based on data filtered from DataStreamSource

2019-12-16 Thread vino yang
Hi Sideny,

>> I'd prefer not to consume messages I don't plan on actually handling.

It depends on your design. If you produce different types into different
partitions, then it's easy to filter different types from the Kafka
consumer(only consume partial partition).

If you do not distinguish different types in the partitions of the Kafka
topic. You can filter messages based on type in Flink job.

>> I MUST consume the messages, count those I want to filter out and then
simply not handle them?

I did not say "you MUST", I said "you can".

Actually, there are serval solutions.

e.g.
1) I described in the last mail;
2) filter in flink source;
3) filter via flink filter transform function
4) side output/split, selet

Choosing one solution that suite your scene.

The key thing in my last mail is to describe the problem of your reflection
problem.

Best,
Vino

Sidney Feiner  于2019年12月16日周一 下午9:31写道:

> You are right with everything you say!
> The solution you propose is actually what I'm trying to avoid. I'd prefer
> not to consume messages I don't plan on actually handling.
> But from what you say it sounds I have no other choice. Am I right? I MUST
> consume the messages, count those I want to filter out and then simply not
> handle them?
> Which means I must filter them in the task itself and I have no way of
> filtering them directly from the data source?
>
>
> *Sidney Feiner* */* Data Platform Developer
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
> [image: emailsignature]
>
> --
> *From:* vino yang 
> *Sent:* Monday, December 16, 2019 7:56 AM
> *To:* Sidney Feiner 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Fw: Metrics based on data filtered from DataStreamSource
>
> Hi Sidney,
>
> Firstly, the `open` method of UDF's instance is always invoked when the
> task thread starts to run.
>
> From the second code snippet image that you provided, I guess you are
> trying to get a dynamic handler with reflection technology, is
> that correct? I also guess that you want to get a dynamic instance of a
> handler in the runtime, correct me if I am wrong.
>
> IMO, you may misunderstand the program you write and the runtime of Task,
> the purpose of your program is used to build the job graph. The business
> logic in UDF is used to describe the user's business logic.
>
> For your scene, if many types of events exist in one topic, you can
> consume them and group by the type then count them?
>
> Best,
> Vino
>
> Sidney Feiner  于2019年12月16日周一 上午12:54写道:
>
> Hey,
> I have a question about using metrics based on filtered data.
> Basically, I have handlers for many types of events I get from my data
> source (in my case, Kafka), and every handler has it's own filter function.
> That given handler also has a Counter, incrementing every time it filters
> out an event (as part of the FilterFunction).
>
> Problem arrises when I use that FilterFunction on the DataSourceStream -
> the handler's open() function hasn't been called and thus the metrics have
> never been initiated.
> Do I have a way of making this work? Or any other way of counting events
> that have been filtered out from the DataStreamSource?
>
> Handler:
>
> public abstract class Handler extends RichMapFunction {
> private transient Counter filteredCounter;
> private boolean isInit = false;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> if (!isInit) {
> MetricGroup metricGroup = 
> getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
> filteredCounter = 
> metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
> isInit = true;
> }
> }
>
> public final FilterFunction getFilter() {
> return (FilterFunction) event -> {
> boolean res = filter(event);
> if (!res) {
> filteredCounter.inc();
> }
> return res;
> };
> }
>
> abstract protected boolean filter(Event event);
> }
>
>
> And when I init the DataStreamSource:
>
> Handler handler = (Handler) 
> Class.forName(handlerName).getConstructor().newInstance();
> dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);
>
>
> Any help would be much appreciated!
>
> Thanks 
>
>
>
>


Re: [EXTERNAL] Flink and Prometheus monitoring question

2019-12-16 Thread Zhu Zhu
Hi Jesús,
If your job has checkpointing enabled, you can monitor
'numberOfCompletedCheckpoints' to see wether the job is still alive and
healthy.

Thanks,
Zhu Zhu

Jesús Vásquez  于2019年12月17日周二 上午2:43写道:

> The thing about numRunningJobs metric is that i have to configure in
> advance the Prometheus rules with the number of jobs i expect to be running
> in order to alert, i kind of need this rule to alert on individual jobs. I
> initially thought of flink_jobmanager_downtime{job_id=~".*"} == -1 , bit it
> resulted that the metric just emits 0 on running jobs, and doesn't emit -1
> for failed jobs.
>
> El lun., 16 dic. 2019 7:01 p. m., PoolakkalMukkath, Shakir <
> shakir_poolakkalmukk...@comcast.com> escribió:
>
>> You could use “flink_jobmanager_numRunningJobs” to check the number of
>> running jobs.
>>
>>
>>
>> Thanks
>>
>>
>>
>> *From: *Jesús Vásquez 
>> *Date: *Monday, December 16, 2019 at 12:47 PM
>> *To: *"user@flink.apache.org" 
>> *Subject: *[EXTERNAL] Flink and Prometheus monitoring question
>>
>>
>>
>> Hi,
>>
>> I want to monitor Flink Streaming jobs using Prometheus
>>
>> My first goal is to send alerts when a Flink job has failed.
>>
>> The thing is that looking at the documentation I haven't found a metric
>> that helps me defining an alerting rule.
>>
>> As a starting point i thought that the metric
>> flink_jobmanager_job_downtime could help since the doc says this metric
>> emits -1 for a completed job.
>>
>> But when i tested this i found out this doesn't work since the metric
>> always emits 0 and after the job is completed there is no metric.
>>
>> Has anyone managed to alert when flink job has failed with Prometheus?
>>
>> Thanks for your help.
>>
>


Re: Join a datastream with tables stored in Hive

2019-12-16 Thread Kurt Young
Great, looking forward to hearing from you again.

Best,
Kurt


On Mon, Dec 16, 2019 at 10:22 PM Krzysztof Zarzycki 
wrote:

> Thanks Kurt for your answers.
>
> Summing up, I feel like the option 1 (i.e. join with temporal table
> function) requires some coding around a source, that needs to pull data
> once a day. But otherwise, bring the following benefits:
> * I don't have to put dicts in another store like Hbase. All stays in
> Hive + Flink.
> * I'll be able to make a true temporal join - event-time based.
> * I believe I will be able to build a history reprocessing program based
> on the same logic (i.e. same SQL). At least for a particular day -
> processing multiple days would be tricky, because I will need to pull
> multiple versions of the dictionary.
> Plus, looking up dict values will be much faster and resource optimal when
> dict is stored in a state instead of uncached Hbase. It's especially
> important in a case when we want to reprocess historical, archived stream
> with a speed of millions of events/sec.
>
> I understand that option 2 is easier to implement. I may do a PoC of it as
> well.
> OK, I believe I know enough to get my hands dirty with the code. I can
> share later on what I was able to accomplish. And probably more questions
> will show up when I finally start the implementation.
>
> Thanks
> Krzysztof
>
> pon., 16 gru 2019 o 03:14 Kurt Young  napisał(a):
>
>> Hi Krzysztof, thanks for the discussion, you raised lots of good
>> questions, I will try to reply them
>> one by one.
>>
>> Re option 1:
>>
>> > Question 1: do I need to write that Hive source or can I use something
>> ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource
>> class?
>>
>> I'm not sure if you can reuse the logic of `HiveTableSource`. Currently
>> `HiveTableSource` works
>> as batch mode, it will read all data at once and stop. But what you need
>> is wait until next day after
>> finish. What you can try is reuse the logic of `HiveTableInputFormat`,
>> and wrap the "monitoring"
>> logic outside.
>>
>> > Question/worry 2:  the state would grow inifinitely if I had infinite
>> number of keys, but not only infinite number of versions of all keys.
>>
>> The temporal table function doesn't support watermark based state clean
>> up yet, but what you can
>> try is idle state retention [1]. So even if you have infinite number of
>> keys, for example say you have
>> different join keys every day, the old keys will not be touched in next
>> days and become idle and will
>> be deleted by framework.
>>
>> > Question 3: Do you imagine that I could use the same logic for both
>> stream processing and reprocessing just by replacing sources and sinks?
>>
>> Generally speaking, yes I think so. With event time based join, we should
>> be able to reuse the logic
>> of normal stream processing and reprocessing historical data. Although
>> there will definitely exists some
>> details should be addressed, like event time and watermarks.
>>
>> Re option 2:
>>
>> > maybe implement Hive/JDBC-based LookupableTableSource that  pulls the
>> whole dictionary to memory
>>
>> You can do this manually but I would recommend you go with the first
>> choice which loads hive table
>> to HBase periodically. It's much more easier and efficient. And this
>> approach you mentioned also
>> seems a little bit duplicate with the temporal table function solution.
>>
>> > this option is available only with Blink engine and also only with use
>> of Flink SQL, no Table API?
>>
>> I'm afraid yes, you can only use it with SQL for now.
>>
>> > do you think it would be possible to use the same logic / SQL for
>> reprocessing?
>>
>> Given the fact this solution is based on processing time, I don't think
>> it can cover the use case of
>> reprocessing, except if you can accept always joining with latest day
>> even during backfilling. But we
>> are also aiming to resolve this shortcoming maybe in 1 or 2 releases.
>>
>> Best,
>> Kurt
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>
>>
>> On Sat, Dec 14, 2019 at 3:41 AM Krzysztof Zarzycki 
>> wrote:
>>
>>> Very interesting, Kurt! Yes, I also imagined it's rather a very common
>>> case. In my company we currently have 3 clients wanting this functionality.
>>> I also just realized this slight difference between Temporal Join and
>>> Temporal Table Function Join, that there are actually two methods:)
>>>
>>> Regarding option 1:
>>> So I would need to:
>>> * write a Datastream API source, that pulls Hive dictionary table every
>>> let's say day, assigns event time column to rows and creates a stream of
>>> it. It does that and only that.
>>> * create a table (from Table API) out of it, assigning one of the
>>> columns as an event time column.
>>> * then use table.createTemporalTableFunction(>> time column>)
>>> * finally join my main data stream with the temporal table function (let
>>> me use short name 

flink 1.9 conflict jackson version

2019-12-16 Thread Fanbin Bu
Hi,

After I upgrade flink 1.9, I got the following error message on EMR, it
works locally on IntelliJ.

I'm explicitly declaring the dependency as
implementation
'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'
and I have
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version:
'1.11.595'



java.lang.NoSuchMethodError:
com.fasterxml.jackson.databind.ObjectMapper.enable([Lcom/fasterxml/jackson/core/JsonParser$Feature;)Lcom/fasterxml/jackson/databind/ObjectMapper;
at 
com.amazonaws.partitions.PartitionsLoader.(PartitionsLoader.java:54)
at 
com.amazonaws.regions.RegionMetadataFactory.create(RegionMetadataFactory.java:30)
at com.amazonaws.regions.RegionUtils.initialize(RegionUtils.java:65)
at 
com.amazonaws.regions.RegionUtils.getRegionMetadata(RegionUtils.java:53)
at com.amazonaws.regions.RegionUtils.getRegion(RegionUtils.java:107)
at 
com.amazonaws.client.builder.AwsClientBuilder.getRegionObject(AwsClientBuilder.java:256)
at 
com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:460)
at 
com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:424)
at 
com.amazonaws.client.builder.AwsAsyncClientBuilder.build(AwsAsyncClientBuilder.java:80)
at 
com.coinbase.util.KmsClient$.getSnowflakeUsernamePassword(KmsClient.scala:21)
at com.coinbase.ml.RunFlinkJob$.runBatch(RunFlinkJob.scala:94)
at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:38)
at 
com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
at 
com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)


Re: [EXTERNAL] Flink and Prometheus monitoring question

2019-12-16 Thread Jesús Vásquez
The thing about numRunningJobs metric is that i have to configure in
advance the Prometheus rules with the number of jobs i expect to be running
in order to alert, i kind of need this rule to alert on individual jobs. I
initially thought of flink_jobmanager_downtime{job_id=~".*"} == -1 , bit it
resulted that the metric just emits 0 on running jobs, and doesn't emit -1
for failed jobs.

El lun., 16 dic. 2019 7:01 p. m., PoolakkalMukkath, Shakir <
shakir_poolakkalmukk...@comcast.com> escribió:

> You could use “flink_jobmanager_numRunningJobs” to check the number of
> running jobs.
>
>
>
> Thanks
>
>
>
> *From: *Jesús Vásquez 
> *Date: *Monday, December 16, 2019 at 12:47 PM
> *To: *"user@flink.apache.org" 
> *Subject: *[EXTERNAL] Flink and Prometheus monitoring question
>
>
>
> Hi,
>
> I want to monitor Flink Streaming jobs using Prometheus
>
> My first goal is to send alerts when a Flink job has failed.
>
> The thing is that looking at the documentation I haven't found a metric
> that helps me defining an alerting rule.
>
> As a starting point i thought that the metric
> flink_jobmanager_job_downtime could help since the doc says this metric
> emits -1 for a completed job.
>
> But when i tested this i found out this doesn't work since the metric
> always emits 0 and after the job is completed there is no metric.
>
> Has anyone managed to alert when flink job has failed with Prometheus?
>
> Thanks for your help.
>


Re: [EXTERNAL] Flink and Prometheus monitoring question

2019-12-16 Thread PoolakkalMukkath, Shakir
You could use “flink_jobmanager_numRunningJobs” to check the number of running 
jobs.

Thanks

From: Jesús Vásquez 
Date: Monday, December 16, 2019 at 12:47 PM
To: "user@flink.apache.org" 
Subject: [EXTERNAL] Flink and Prometheus monitoring question

Hi,
I want to monitor Flink Streaming jobs using Prometheus
My first goal is to send alerts when a Flink job has failed.
The thing is that looking at the documentation I haven't found a metric that 
helps me defining an alerting rule.
As a starting point i thought that the metric flink_jobmanager_job_downtime 
could help since the doc says this metric emits -1 for a completed job.
But when i tested this i found out this doesn't work since the metric always 
emits 0 and after the job is completed there is no metric.
Has anyone managed to alert when flink job has failed with Prometheus?
Thanks for your help.


Flink and Prometheus monitoring question

2019-12-16 Thread Jesús Vásquez
Hi,
I want to monitor Flink Streaming jobs using Prometheus
My first goal is to send alerts when a Flink job has failed.
The thing is that looking at the documentation I haven't found a metric
that helps me defining an alerting rule.
As a starting point i thought that the metric flink_jobmanager_job_downtime
could help since the doc says this metric emits -1 for a completed job.
But when i tested this i found out this doesn't work since the metric
always emits 0 and after the job is completed there is no metric.
Has anyone managed to alert when flink job has failed with Prometheus?
Thanks for your help.


Re: FlinkSQL中关于TIMESTAMPDIFF函数官网E.g的疑问

2019-12-16 Thread jingjing bai
不知道你的版本是什么,
在1.9中, 申明为TIMESTAMP类型的属性,需要是 格式化为-MM-DD'T'HH:mm:ss.SSS'Z'
不过你可以从外部传入13位时间戳,也可以转换成TIMESTAMP,比如DDL中定义

CREATE TABLE `t` (
   ctm TIMESTAMP,
) WITH (
  'format.schema' = 'ROW'
)"
。如果数据源也要定义为TIMESTAMP类型,则通过下面方式去定义外部数据源格式:

DateTimeFormatter t = new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.appendLiteral('T')
.append(new DateTimeFormatterBuilder()
.appendPattern("HH:mm:ss")
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
.appendPattern("'Z'")
.toFormatter())
.toFormatter();

用这个结构去格式化时间类型


Zhenghua Gao  于2019年12月16日周一 下午7:42写道:

> 1) ML里直接发截图无法展示,可以用第三方图床,然后链接过来。
> 2) 请确认 time1/time2 类型是否是 TIMESTAMP
> 3) 文档中的 TIMESTAMP '2003-01-02 10:00:00' 代表标准SQL的时间常量(timestamp literal),你的
> TIMESTAMP time1 无法被视作时间常量。
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Dec 16, 2019 at 3:51 PM 1530130567 <1530130...@qq.com> wrote:
>
> > 各位大佬好:
> >由于业务处理逻辑需要计算两个日期的时间差,按照惯例打开官网,查看buildin Functions,ctrC, ctrV,跑起来没问题!
> >
> >   然后我就改了一下:
> >   TIMESTAMPDIFF(DAY, TIMESTAMP time1,TIMESTAMP time2)
> >   SQL validate报错!
> >   然后我又改了一下:
> >   TIMESTAMPDIFF(DAY, cast(time1 as timestamp),cast(time2 as timestamp))
> >   通过了!
> >
> >   我有点疑惑,TIMESTAMP是不是只适用于固定的字符串,而不能用列名这种变量?
> >
> >
>


Re: Join a datastream with tables stored in Hive

2019-12-16 Thread Krzysztof Zarzycki
Thanks Kurt for your answers.

Summing up, I feel like the option 1 (i.e. join with temporal table
function) requires some coding around a source, that needs to pull data
once a day. But otherwise, bring the following benefits:
* I don't have to put dicts in another store like Hbase. All stays in
Hive + Flink.
* I'll be able to make a true temporal join - event-time based.
* I believe I will be able to build a history reprocessing program based on
the same logic (i.e. same SQL). At least for a particular day - processing
multiple days would be tricky, because I will need to pull multiple
versions of the dictionary.
Plus, looking up dict values will be much faster and resource optimal when
dict is stored in a state instead of uncached Hbase. It's especially
important in a case when we want to reprocess historical, archived stream
with a speed of millions of events/sec.

I understand that option 2 is easier to implement. I may do a PoC of it as
well.
OK, I believe I know enough to get my hands dirty with the code. I can
share later on what I was able to accomplish. And probably more questions
will show up when I finally start the implementation.

Thanks
Krzysztof

pon., 16 gru 2019 o 03:14 Kurt Young  napisał(a):

> Hi Krzysztof, thanks for the discussion, you raised lots of good
> questions, I will try to reply them
> one by one.
>
> Re option 1:
>
> > Question 1: do I need to write that Hive source or can I use something
> ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource
> class?
>
> I'm not sure if you can reuse the logic of `HiveTableSource`. Currently
> `HiveTableSource` works
> as batch mode, it will read all data at once and stop. But what you need
> is wait until next day after
> finish. What you can try is reuse the logic of `HiveTableInputFormat`, and
> wrap the "monitoring"
> logic outside.
>
> > Question/worry 2:  the state would grow inifinitely if I had infinite
> number of keys, but not only infinite number of versions of all keys.
>
> The temporal table function doesn't support watermark based state clean up
> yet, but what you can
> try is idle state retention [1]. So even if you have infinite number of
> keys, for example say you have
> different join keys every day, the old keys will not be touched in next
> days and become idle and will
> be deleted by framework.
>
> > Question 3: Do you imagine that I could use the same logic for both
> stream processing and reprocessing just by replacing sources and sinks?
>
> Generally speaking, yes I think so. With event time based join, we should
> be able to reuse the logic
> of normal stream processing and reprocessing historical data. Although
> there will definitely exists some
> details should be addressed, like event time and watermarks.
>
> Re option 2:
>
> > maybe implement Hive/JDBC-based LookupableTableSource that  pulls the
> whole dictionary to memory
>
> You can do this manually but I would recommend you go with the first
> choice which loads hive table
> to HBase periodically. It's much more easier and efficient. And this
> approach you mentioned also
> seems a little bit duplicate with the temporal table function solution.
>
> > this option is available only with Blink engine and also only with use
> of Flink SQL, no Table API?
>
> I'm afraid yes, you can only use it with SQL for now.
>
> > do you think it would be possible to use the same logic / SQL for
> reprocessing?
>
> Given the fact this solution is based on processing time, I don't think it
> can cover the use case of
> reprocessing, except if you can accept always joining with latest day even
> during backfilling. But we
> are also aiming to resolve this shortcoming maybe in 1 or 2 releases.
>
> Best,
> Kurt
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
>
> On Sat, Dec 14, 2019 at 3:41 AM Krzysztof Zarzycki 
> wrote:
>
>> Very interesting, Kurt! Yes, I also imagined it's rather a very common
>> case. In my company we currently have 3 clients wanting this functionality.
>> I also just realized this slight difference between Temporal Join and
>> Temporal Table Function Join, that there are actually two methods:)
>>
>> Regarding option 1:
>> So I would need to:
>> * write a Datastream API source, that pulls Hive dictionary table every
>> let's say day, assigns event time column to rows and creates a stream of
>> it. It does that and only that.
>> * create a table (from Table API) out of it, assigning one of the columns
>> as an event time column.
>> * then use table.createTemporalTableFunction(> column>)
>> * finally join my main data stream with the temporal table function (let
>> me use short name TTF from now) from my dictionary, using Flink SQL and 
>> LATERAL
>> TABLE (Rates(o.rowtime)) AS r construct.
>> And so I should achieve my temporal event-time based join with versioned
>> dictionaries!
>> Question 1: do I need to write that Hive source or can I use 

Re: Fw: Metrics based on data filtered from DataStreamSource

2019-12-16 Thread Sidney Feiner
You are right with everything you say!
The solution you propose is actually what I'm trying to avoid. I'd prefer not 
to consume messages I don't plan on actually handling.
But from what you say it sounds I have no other choice. Am I right? I MUST 
consume the messages, count those I want to filter out and then simply not 
handle them?
Which means I must filter them in the task itself and I have no way of 
filtering them directly from the data source?



Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



From: vino yang 
Sent: Monday, December 16, 2019 7:56 AM
To: Sidney Feiner 
Cc: user@flink.apache.org 
Subject: Re: Fw: Metrics based on data filtered from DataStreamSource

Hi Sidney,

Firstly, the `open` method of UDF's instance is always invoked when the task 
thread starts to run.

From the second code snippet image that you provided, I guess you are trying to 
get a dynamic handler with reflection technology, is that correct? I also guess 
that you want to get a dynamic instance of a handler in the runtime, correct me 
if I am wrong.

IMO, you may misunderstand the program you write and the runtime of Task, the 
purpose of your program is used to build the job graph. The business logic in 
UDF is used to describe the user's business logic.

For your scene, if many types of events exist in one topic, you can consume 
them and group by the type then count them?

Best,
Vino

Sidney Feiner mailto:sidney.fei...@startapp.com>> 
于2019年12月16日周一 上午12:54写道:
Hey,
I have a question about using metrics based on filtered data.
Basically, I have handlers for many types of events I get from my data source 
(in my case, Kafka), and every handler has it's own filter function.
That given handler also has a Counter, incrementing every time it filters out 
an event (as part of the FilterFunction).

Problem arrises when I use that FilterFunction on the DataSourceStream - the 
handler's open() function hasn't been called and thus the metrics have never 
been initiated.
Do I have a way of making this work? Or any other way of counting events that 
have been filtered out from the DataStreamSource?

Handler:

public abstract class Handler extends RichMapFunction {
private transient Counter filteredCounter;
private boolean isInit = false;

@Override
public void open(Configuration parameters) throws Exception {
if (!isInit) {
MetricGroup metricGroup = 
getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
filteredCounter = 
metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
isInit = true;
}
}

public final FilterFunction getFilter() {
return (FilterFunction) event -> {
boolean res = filter(event);
if (!res) {
filteredCounter.inc();
}
return res;
};
}

abstract protected boolean filter(Event event);
}

And when I init the DataStreamSource:

Handler handler = (Handler) 
Class.forName(handlerName).getConstructor().newInstance();
dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);

Any help would be much appreciated!

Thanks 





Re: Documentation tasks for release-1.10

2019-12-16 Thread vino yang
+1 for centralizing all the documentation issues so that the community can
take more effective to fix them.

Best,
Vino

Xintong Song  于2019年12月16日周一 下午6:02写道:

> Thank you Kostas.
> Big +1 for keeping all the documentation related issues at one place.
>
> I've added the documentation task for resource management.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Dec 16, 2019 at 5:29 PM Kostas Kloudas 
> wrote:
>
>> Hi all,
>>
>> With the feature-freeze for the release-1.10 already past us, it is
>> time to focus a little bit on documenting the new features that the
>> community added to this release, and improving the already existing
>> documentation based on questions that we see in Flink's mailing lists.
>>
>> To this end, I have create an umbrella issue
>> https://issues.apache.org/jira/browse/FLINK-15273 to monitor the
>> pending documentation tasks. This is by no means an exhaustive list of
>> the tasks, so feel free to add more.
>>
>> Having a central place with all these tasks will also allow more
>> easily other members of the community, not necessarily people who
>> implemented the features, to get involved with the project. This is a
>> really helpful way to get in touch with the community and help the
>> project reach even greater audiences, as a feature that is not
>> documented is non-existent to users.
>>
>> Thanks a lot,
>> Kostas
>>
>


Re: Jobmanager not properly fenced when killed by YARN RM

2019-12-16 Thread Yang Wang
Hi Paul,

I found lots of "Failed to stop Container " logs in the jobmanager.log. It
seems that the Yarn cluster
is not working normally. So the Flink YarnResourceManager may also
unregister app failed. If we
unregister app successfully, no new attempt will be started.

The second and following jobmanager attempt started and failed because the
staging directory
on HDFS was cleaned up. So could you find the log "Could not unregister the
application master"
in all the jobmanager logs, including the first one?



Best,
Yang

Paul Lam  于2019年12月16日周一 下午7:56写道:

> Hi Yang,
>
> Thanks a lot for your reply!
>
> I might not make myself clear, but the new jobmanager in the new YARN
> application attempt did start successfully. And unluckily, I didn’t find
> any logs written by YarnResourceManager in the jobmanager logs.
>
> The jobmanager logs are in the attachment (with some subtask status change
> logs removed and env info censored).
>
> Thanks!
>
>
> Best,
> Paul Lam
>
> 在 2019年12月16日,14:42,Yang Wang  写道:
>
> Hi Paul,
>
> I have gone through the codes and found that the root cause may be
> `YarnResourceManager` cleaned
> up the application staging directory. When it unregisters from the Yarn
> ResourceManager failed, a
> new attempt will be launched and failed quickly because of localization
> failed.
>
> I think it is a bug, and will happen when unregister application failed.
> Could you share some logs of
> jobmanager, the second or following attempt so that i could confirm the
> bug?
>
>
> Best,
> Yang
>
> Paul Lam  于2019年12月14日周六 上午11:02写道:
>
>> Hi,
>>
>> Recently I've seen a situation when a JobManager received a stop signal
>> from YARN RM but failed to exit and got in the restart loop, and keeps
>> failing because the TaskManager containers are disconnected (killed by RM
>> as well) before finally exited when hit the limit of the restart policy.
>> This further resulted in the flink job being marked as final status failed
>> and cleanup of  zookeeper paths, so when a new JobManager started up it
>> found no checkpoint to restore and performed a stateless restart. In
>> addition, the application is run with Flink 1.7.1 in HA job cluster mode on
>> Hadoop 2.6.5.
>>
>> As I can remember, I've seen a similar issue that relates to the fencing
>> of JobManager, but I searched the JIRA and couldn't find it. It would be
>> great if someone can point me to the right direction. And any comments are
>> also welcome! Thanks!
>>
>> Best,
>> Paul Lam
>>
>
>


Re: FlinkSQL中关于TIMESTAMPDIFF函数官网E.g的疑问

2019-12-16 Thread Zhenghua Gao
1) ML里直接发截图无法展示,可以用第三方图床,然后链接过来。
2) 请确认 time1/time2 类型是否是 TIMESTAMP
3) 文档中的 TIMESTAMP '2003-01-02 10:00:00' 代表标准SQL的时间常量(timestamp literal),你的
TIMESTAMP time1 无法被视作时间常量。

*Best Regards,*
*Zhenghua Gao*


On Mon, Dec 16, 2019 at 3:51 PM 1530130567 <1530130...@qq.com> wrote:

> 各位大佬好:
>由于业务处理逻辑需要计算两个日期的时间差,按照惯例打开官网,查看buildin Functions,ctrC, ctrV,跑起来没问题!
>
>   然后我就改了一下:
>   TIMESTAMPDIFF(DAY, TIMESTAMP time1,TIMESTAMP time2)
>   SQL validate报错!
>   然后我又改了一下:
>   TIMESTAMPDIFF(DAY, cast(time1 as timestamp),cast(time2 as timestamp))
>   通过了!
>
>   我有点疑惑,TIMESTAMP是不是只适用于固定的字符串,而不能用列名这种变量?
>
>


Re: Documentation tasks for release-1.10

2019-12-16 Thread Xintong Song
Thank you Kostas.
Big +1 for keeping all the documentation related issues at one place.

I've added the documentation task for resource management.

Thank you~

Xintong Song



On Mon, Dec 16, 2019 at 5:29 PM Kostas Kloudas  wrote:

> Hi all,
>
> With the feature-freeze for the release-1.10 already past us, it is
> time to focus a little bit on documenting the new features that the
> community added to this release, and improving the already existing
> documentation based on questions that we see in Flink's mailing lists.
>
> To this end, I have create an umbrella issue
> https://issues.apache.org/jira/browse/FLINK-15273 to monitor the
> pending documentation tasks. This is by no means an exhaustive list of
> the tasks, so feel free to add more.
>
> Having a central place with all these tasks will also allow more
> easily other members of the community, not necessarily people who
> implemented the features, to get involved with the project. This is a
> really helpful way to get in touch with the community and help the
> project reach even greater audiences, as a feature that is not
> documented is non-existent to users.
>
> Thanks a lot,
> Kostas
>


Documentation tasks for release-1.10

2019-12-16 Thread Kostas Kloudas
Hi all,

With the feature-freeze for the release-1.10 already past us, it is
time to focus a little bit on documenting the new features that the
community added to this release, and improving the already existing
documentation based on questions that we see in Flink's mailing lists.

To this end, I have create an umbrella issue
https://issues.apache.org/jira/browse/FLINK-15273 to monitor the
pending documentation tasks. This is by no means an exhaustive list of
the tasks, so feel free to add more.

Having a central place with all these tasks will also allow more
easily other members of the community, not necessarily people who
implemented the features, to get involved with the project. This is a
really helpful way to get in touch with the community and help the
project reach even greater audiences, as a feature that is not
documented is non-existent to users.

Thanks a lot,
Kostas