flink????????????????

2019-12-17 Thread cs
flink run -m yarn-cluster -yn 10 -ys 1 -p 10 -yn 20 -ys 2 -p 
40


Re: How to reprocess certain events in Flink?

2019-12-17 Thread Rafi Aroch
Hi Pooja,

Here's an implementation from Jamie Grier for de-duplication using
in-memory cache with some expiration time:
https://github.com/jgrier/FilteringExample/blob/master/src/main/java/com/dataartisans/DedupeFilteringJob.java

If for your use-case you can limit the time period where duplications may
happen, you can use this approach.

Thanks,
Rafi


On Wed, Dec 18, 2019 at 8:16 AM Pooja Agrawal  wrote:

> Hey,
>
> I am sorry for the confusion. So, the value is not already present in the
> event. We are reading it from a static table (kind of data enrichment in
> flink job). Above event is an enriched event.
> If we say that this is some transaction event, the user would have done
> the transaction once and hence the transaction_id is unique. But, the table
> from where we are reading the value may contain the wrong value (not
> always, sometimes because of bug). In this case, we may want to reprocess
> that transaction event with new value (here, the transaction_id will be
> same as previous, but the value will change). I hope this clears the
> scenario. Let me know if you have any other questions.
>
> To solve the idempotency problem, you suggested to maintain a set
> recording transaction_id(s). Since, we are aggregating over all events seen
> till now, the number of events and hence ids will be too large. I am
> assuming we will need to have some external store here and do a lookup
> every time we process an event. This may increase the latency. Can you
> suggest the efficient way to solve this? and if we need to have an external
> store, what will be the best candidate?
>
> Thanks
> Pooja
>
>
>
> On Wed, Dec 18, 2019 at 8:19 AM Zhu Zhu  wrote:
>
>> Hi Pooja,
>>
>> I'm a bit confused since in 1) it says that "If two events have same
>> transaction_id, we can say that they are duplicates", and in 2) it says
>> that "Since this is just a value change, the transaction_id will be same".
>> Looks to me they are conflicting. Usually in case 2) scenarios, the value
>> updates event is considered as new event which does not share the unique id
>> with prior events.
>>
>> If each event has a unique transaction_id, you can use it to de-duplicate
>> the events via a set recording the transaction_id(s) which are
>> already processed. And then 2) would not be a problem with the unique
>> transaction_id assumption.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Pooja Agrawal  于2019年12月17日周二 下午9:17写道:
>>
>>>
>>> Hi,
>>>
>>> I have a use case where we are reading events from kinesis stream.The
>>> event can look like this
>>> Event {
>>>   event_id,
>>>   transaction_id
>>>   key1,
>>>   key2,
>>>   value,
>>>   timestamp,
>>>   some other fields...
>>> }
>>>
>>> We want to aggregate the values per key for all events we have seen till
>>> now (as simple as "select key1, key2, sum(value) from events group by key1,
>>> key2key."). For this I have created a simple flink job which uses
>>> flink-kinesis connector and applies keyby() and sum() on the incoming
>>> events. I am facing two challenges here:
>>>
>>> 1) The incoming events can have duplicates. How to maintain exactly once
>>> processing here, as processing duplicate events can give me erroneous
>>> result? The field transaction_id will be unique for each events. If two
>>> events have same transaction_id, we can say that they are duplicates (By
>>> duplicates here, I don't just mean the retried ones. The same message can
>>> be present in kinesis with different sequence number. I am not sure if
>>> flink-kinesis connector can handle that, as it is using KCL underlying
>>> which I assume doesn't take care of it)
>>>
>>> 2) There can be the the cases where the value has been updated for a key
>>> after processing the event and we may want to reprocess those events with
>>> new value. Since this is just a value change, the transaction_id will be
>>> same. The idempotency logic will not allow to reprocess the events. What
>>> are the ways to handle such scenarios in flink?
>>>
>>> Thanks
>>> Pooja
>>>
>>>
>>> --
>>> Warm Regards,
>>> Pooja Agrawal
>>>
>>
>
> --
> Warm Regards,
> Pooja Agrawal
>


Re: How to reprocess certain events in Flink?

2019-12-17 Thread Pooja Agrawal
Hey,

I am sorry for the confusion. So, the value is not already present in the
event. We are reading it from a static table (kind of data enrichment in
flink job). Above event is an enriched event.
If we say that this is some transaction event, the user would have done the
transaction once and hence the transaction_id is unique. But, the table
from where we are reading the value may contain the wrong value (not
always, sometimes because of bug). In this case, we may want to reprocess
that transaction event with new value (here, the transaction_id will be
same as previous, but the value will change). I hope this clears the
scenario. Let me know if you have any other questions.

To solve the idempotency problem, you suggested to maintain a set recording
transaction_id(s). Since, we are aggregating over all events seen till now,
the number of events and hence ids will be too large. I am assuming we will
need to have some external store here and do a lookup every time we process
an event. This may increase the latency. Can you suggest the efficient way
to solve this? and if we need to have an external store, what will be the
best candidate?

Thanks
Pooja



On Wed, Dec 18, 2019 at 8:19 AM Zhu Zhu  wrote:

> Hi Pooja,
>
> I'm a bit confused since in 1) it says that "If two events have same
> transaction_id, we can say that they are duplicates", and in 2) it says
> that "Since this is just a value change, the transaction_id will be same".
> Looks to me they are conflicting. Usually in case 2) scenarios, the value
> updates event is considered as new event which does not share the unique id
> with prior events.
>
> If each event has a unique transaction_id, you can use it to de-duplicate
> the events via a set recording the transaction_id(s) which are
> already processed. And then 2) would not be a problem with the unique
> transaction_id assumption.
>
> Thanks,
> Zhu Zhu
>
> Pooja Agrawal  于2019年12月17日周二 下午9:17写道:
>
>>
>> Hi,
>>
>> I have a use case where we are reading events from kinesis stream.The
>> event can look like this
>> Event {
>>   event_id,
>>   transaction_id
>>   key1,
>>   key2,
>>   value,
>>   timestamp,
>>   some other fields...
>> }
>>
>> We want to aggregate the values per key for all events we have seen till
>> now (as simple as "select key1, key2, sum(value) from events group by key1,
>> key2key."). For this I have created a simple flink job which uses
>> flink-kinesis connector and applies keyby() and sum() on the incoming
>> events. I am facing two challenges here:
>>
>> 1) The incoming events can have duplicates. How to maintain exactly once
>> processing here, as processing duplicate events can give me erroneous
>> result? The field transaction_id will be unique for each events. If two
>> events have same transaction_id, we can say that they are duplicates (By
>> duplicates here, I don't just mean the retried ones. The same message can
>> be present in kinesis with different sequence number. I am not sure if
>> flink-kinesis connector can handle that, as it is using KCL underlying
>> which I assume doesn't take care of it)
>>
>> 2) There can be the the cases where the value has been updated for a key
>> after processing the event and we may want to reprocess those events with
>> new value. Since this is just a value change, the transaction_id will be
>> same. The idempotency logic will not allow to reprocess the events. What
>> are the ways to handle such scenarios in flink?
>>
>> Thanks
>> Pooja
>>
>>
>> --
>> Warm Regards,
>> Pooja Agrawal
>>
>

-- 
Warm Regards,
Pooja Agrawal


Re: flink如何动态修改窗口大小和类型?

2019-12-17 Thread LakeShen
使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。

陈帅  于2019年12月14日周六 下午6:44写道:

> flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口.
>


Re: 关于直接设置Watermark和flatmap后再设置的疑问

2019-12-17 Thread LakeShen
flatmap 逻辑中,你是否对消息记录的时间处理了吗,watermark 的更新的逻辑是比前一次 watermark 的时间截要大同时非空。

猫猫 <16770...@qq.com> 于2019年12月18日周三 上午9:27写道:

> env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...)
>
> env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...);
>
> 使用kafka进行消费,直接设置Watermark和经过flatMap()以后再设置,会产生什么样的区别和影响。
> flatMap可能会将数据处理为1-N条。那么在这种情况下,还能够保证kafka的精确一次吗?


Re: RichAsyncFunction Timeout

2019-12-17 Thread Biao Liu
Hi Polarisary,

It's hard to tell what happened without further detail. Just some guesses.
1. Have you completed the "resultFuture" in "asyncInvoke"? Asking this is
because there is only a part of "asyncInvoke" implementation, I can't see
the completion part.
2. The capacity (10) of async waiting queue is enough or not? The time of
waiting queue available is also a part of the timeout calculation. It seems
this behavior has been changed in master branch recently. I'm not sure if
it's included or not in your version.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 18 Dec 2019 at 11:09, Polarisary  wrote:

> Hi ALL,
> When I use RichAsyncFunction read data from hbase, it always timeout after
> a few minutes. but the hbase connection is not close, it also can get data
> in the override method timeout.
>
> Following is the code, does somebody know why trigger timeout.
>
> 
>
> AsyncDataStream.unorderedWait(uidDs, new AsyncHBaseRequest(hTableName,
> 
> HBaseConfigurationUtil.serializeConfiguration(hbaseClientConf), hbaseSchema)
> , 5, TimeUnit.MINUTES, 10)
>
>
>
> @Override
> public void timeout(Tuple1 input, ResultFuture Short, Short, Long, Integer, Long>> resultFuture) throws Exception {
>
> Row r = 
> readHelper.parseToRow(table.get(readHelper.createGet("13491261515587439bf2f217")));
> logger.error("Timeout Error, input [{}], conn {}, row [{}]", input.f0, 
> hConnection.isClosed(), r.toString());
> }
>
> @Override
> public void asyncInvoke(Tuple1 input, ResultFuture Short, Short, Long, Integer, Long>> resultFuture) throws Exception {
> FamilyFilter filter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new 
> BinaryComparator(Bytes.toBytes("f1")));
> String rkStart = 
> UserInstallAppLookupTableSource.getLeftRowkeyByUid(input.f0, 0);
> String rkEnd = 
> UserInstallAppLookupTableSource.getLeftRowkeyByUid(input.f0, 9L);
>
>
>
>
> polaris...@gmail.com
>
>
>
>
>


Re: RichAsyncFunction Timeout

2019-12-17 Thread vino yang
Hi  Polarisary,

IMO, firstly, it would be better to monitor the OS and Flink/HBase metrics.
For example:


   - Flink and HBase cluster Network I/O metrics;
   - Flink TM CPU/Memory/Backpressure metrics and so on;

You can view these metrics to find some potential reasons. If you can not
figure it out, you can share these metrics with the community.

Best,
Vino

Polarisary  于2019年12月18日周三 上午11:09写道:

> Hi ALL,
> When I use RichAsyncFunction read data from hbase, it always timeout after
> a few minutes. but the hbase connection is not close, it also can get data
> in the override method timeout.
>
> Following is the code, does somebody know why trigger timeout.
>
> 
>
> AsyncDataStream.unorderedWait(uidDs, new AsyncHBaseRequest(hTableName,
> 
> HBaseConfigurationUtil.serializeConfiguration(hbaseClientConf), hbaseSchema)
> , 5, TimeUnit.MINUTES, 10)
>
>
>
> @Override
> public void timeout(Tuple1 input, ResultFuture Short, Short, Long, Integer, Long>> resultFuture) throws Exception {
>
> Row r = 
> readHelper.parseToRow(table.get(readHelper.createGet("13491261515587439bf2f217")));
> logger.error("Timeout Error, input [{}], conn {}, row [{}]", input.f0, 
> hConnection.isClosed(), r.toString());
> }
>
> @Override
> public void asyncInvoke(Tuple1 input, ResultFuture Short, Short, Long, Integer, Long>> resultFuture) throws Exception {
> FamilyFilter filter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new 
> BinaryComparator(Bytes.toBytes("f1")));
> String rkStart = 
> UserInstallAppLookupTableSource.getLeftRowkeyByUid(input.f0, 0);
> String rkEnd = 
> UserInstallAppLookupTableSource.getLeftRowkeyByUid(input.f0, 9L);
>
>
>
>
> polaris...@gmail.com
>
>
>
>
>


RichAsyncFunction Timeout

2019-12-17 Thread Polarisary
Hi ALL,
When I use RichAsyncFunction read data from hbase, it always timeout after a 
few minutes. but the hbase connection is not close, it also can get data in the 
override method timeout.

Following is the code, does somebody know why trigger timeout.


AsyncDataStream.unorderedWait(uidDs, new AsyncHBaseRequest(hTableName,
HBaseConfigurationUtil.serializeConfiguration(hbaseClientConf), 
hbaseSchema)
, 5, TimeUnit.MINUTES, 10)


@Override
public void timeout(Tuple1 input, ResultFuture> resultFuture) throws Exception {

Row r = 
readHelper.parseToRow(table.get(readHelper.createGet("13491261515587439bf2f217")));
logger.error("Timeout Error, input [{}], conn {}, row [{}]", input.f0, 
hConnection.isClosed(), r.toString());
}
@Override
public void asyncInvoke(Tuple1 input, ResultFuture> resultFuture) throws Exception {
FamilyFilter filter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new 
BinaryComparator(Bytes.toBytes("f1")));
String rkStart = 
UserInstallAppLookupTableSource.getLeftRowkeyByUid(input.f0, 0);
String rkEnd = UserInstallAppLookupTableSource.getLeftRowkeyByUid(input.f0, 
9L);




polaris...@gmail.com






Re: Different jobName per Job when reporting Flink metrics to PushGateway

2019-12-17 Thread Zhu Zhu
Hi Sidney,

"metrics.reporter.promgateway.jobName" is a Flink cluster wide config, so
you will need to set it in flink-conf.yaml before launching the Flink
cluster.
An alternative is to use -D(or -yD for yarn) params to override the config
when running a command to launch the Flink session cluster or submit a job
in job cluster mode.

Thanks,
Zhu Zhu

Sidney Feiner  于2019年12月17日周二 下午11:08写道:

> I'm using Flink 1.9.1 with PrometheusPushGateway to report my metrics. The
> jobName the metrics are reported with is defined in the flink-conf.yaml
> file which makes the jobName identical for all jobs who run on the cluster,
> but I want a different jobName to be reported for every running job. To do
> so, I tried doing the following in my code before executing the Stream:
>
> Configuration conf = GlobalConfiguration.loadConfiguration();
> conf.setString(
> "metrics.reporter.promgateway.jobName",
> conf.getString("metrics.reporter.promgateway.jobName", "") + "-" 
> + pipeline
> );
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setGlobalJobParameters(conf);
>
> When pipeline is a String variable.
>
> When running the job locally, it worked. But now I'm running flink in High
> Availability mode and it doesn't work anymore :( The config I override in
> the code is ignored.
>
> So how can I change the jobName per job? And if I can't, is there a way to
> set additional Labels when reporting the metrics? Because I haven't seen an
> option for that as well.
>
> Thanks :)
>
>
> I've posted this on StackOverflow as well - here
> 
> :)
>
>
> *Sidney Feiner* */* Data Platform Developer
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
> [image: emailsignature]
>
>


Re: MiniCluster with ProcessingTimeTrigger

2019-12-17 Thread Biao Liu
Hi John,

The root cause is the collection source exits too fast. The window would
also exit without being triggered.

You could verify that by waiting a second before releasing the window. For
example, insert a map operator between source and window operator. Blocking
a second or more in the "close" method of this map operator. You will see
the window would work well.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 18 Dec 2019 at 06:24, John Morrow  wrote:

> Hi All,
>
> I'm trying to test a pipeline that consists of two Flink tasks with a
> MiniCluster. The 1st task has a WindowAll operator which groups items into
> batches every second, and the 2nd task does an async operation with each
> batch and flatMaps the result.
>
> I've whittled it down to the bare bones below. There are two tests:
>
>- testPipelineWithCountTrigger - this one works fine 
>- testPipelineWithProcessingTimeTrigger - this one doesn't give any
>output 
>
>
> It seems like a timing issue. If I step through the failing one slowly I
> can see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear
> methods do get called, and the asyncInvoke method also gets called, but
> when I run it the 2nd test fails as it produces no output. I've tried
> setting the MiniCluster timeout to 1 day, the same with my AsyncUDF
> timeout, and sleeping for 3 * window after env.execute but no difference.
> I'm running this with Flink 1.9.0 and OpenJDK8 on Ubuntu (build
> 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10).
>
>
> Any idea how I can get the 2nd test to wait to process the output?
>
>
> Thanks 
>
> John.
>
>
>
>
>
>
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
> import org.apache.flink.streaming.api.datastream.AsyncDataStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.async.ResultFuture;
> import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import
> org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
> import
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.test.util.MiniClusterWithClientResource;
> import org.apache.flink.util.Collector;
> import org.junit.jupiter.api.Tag;
> import org.junit.jupiter.api.Test;
>
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> import java.util.stream.StreamSupport;
>
> import static org.junit.jupiter.api.Assertions.assertEquals;
>
>
> public class StreamTest {
>
>   @Test // :)
>   @Tag("unit")
>   public void testPipelineWithCountTrigger() throws Exception {
> runPipeline(10, CountTrigger.of(10));
>   }
>
>   @Test // :(
>   @Tag("unit")
>   public void testPipelineWithProcessingTimeTrigger() throws Exception {
> runPipeline(10, ProcessingTimeTrigger.create());
>   }
>
>
>   private void runPipeline(int inputSize, Trigger
> trigger) throws Exception {
>
> MiniClusterWithClientResource miniCluster = new
> MiniClusterWithClientResource(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberSlotsPerTaskManager(1)
> .setNumberTaskManagers(1)
>
> .setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1,
> TimeUnit.DAYS))
> .build()
> );
> miniCluster.before();
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> CollectSink.values.clear();
>
> List listOfNumbers = IntStream.rangeClosed(1,
> inputSize).boxed().collect(Collectors.toList());
>
> // 1st half of pipeline
> DataStream> pipeA = env.fromCollection(listOfNumbers)
> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
> .trigger(trigger)
> .process(new Batcher());
>
> // 2nd half of pipeline
> DataStream pipeB = AsyncDataStream.unorderedWait(pipeA, new
> AsyncUDF(), 1L, TimeUnit.DAYS, 1 )
> .flatMap((List records, Collector out) ->
> records.forEach(out::collect)).returns(Types.INT);
> pipeB.addSink(new CollectSink());
>
> env.execute();
>
> try {
>   Thread.sleep(1000L * 3);
> } catch (InterruptedException e) {
>   System.out.println();
> }
> 

Re: How to reprocess certain events in Flink?

2019-12-17 Thread Zhu Zhu
Hi Pooja,

I'm a bit confused since in 1) it says that "If two events have same
transaction_id, we can say that they are duplicates", and in 2) it says
that "Since this is just a value change, the transaction_id will be same".
Looks to me they are conflicting. Usually in case 2) scenarios, the value
updates event is considered as new event which does not share the unique id
with prior events.

If each event has a unique transaction_id, you can use it to de-duplicate
the events via a set recording the transaction_id(s) which are
already processed. And then 2) would not be a problem with the unique
transaction_id assumption.

Thanks,
Zhu Zhu

Pooja Agrawal  于2019年12月17日周二 下午9:17写道:

>
> Hi,
>
> I have a use case where we are reading events from kinesis stream.The
> event can look like this
> Event {
>   event_id,
>   transaction_id
>   key1,
>   key2,
>   value,
>   timestamp,
>   some other fields...
> }
>
> We want to aggregate the values per key for all events we have seen till
> now (as simple as "select key1, key2, sum(value) from events group by key1,
> key2key."). For this I have created a simple flink job which uses
> flink-kinesis connector and applies keyby() and sum() on the incoming
> events. I am facing two challenges here:
>
> 1) The incoming events can have duplicates. How to maintain exactly once
> processing here, as processing duplicate events can give me erroneous
> result? The field transaction_id will be unique for each events. If two
> events have same transaction_id, we can say that they are duplicates (By
> duplicates here, I don't just mean the retried ones. The same message can
> be present in kinesis with different sequence number. I am not sure if
> flink-kinesis connector can handle that, as it is using KCL underlying
> which I assume doesn't take care of it)
>
> 2) There can be the the cases where the value has been updated for a key
> after processing the event and we may want to reprocess those events with
> new value. Since this is just a value change, the transaction_id will be
> same. The idempotency logic will not allow to reprocess the events. What
> are the ways to handle such scenarios in flink?
>
> Thanks
> Pooja
>
>
> --
> Warm Regards,
> Pooja Agrawal
>


Re: flink sql confluent schema avro topic注册成表

2019-12-17 Thread 朱广彬
Hi 陈帅,

目前社区确实不支持confluent schema registry的avro格式,我们内部也是依赖schema registry来做avro
schema的管理,所以,我们改动了flink-avro 的源码来支持。

主要涉及到这些地方:
org.apache.flink.formats.avro.{AvroRowFormatFactory,AvroRowDeserializationSchema,AvroRowSerializationSchema}
和org.apache.flink.table.descriptors.{Avro,AvroValidator}

使用时在构建Avro时指定以下三个参数即可(见标红部分):

tableEnv.connect(
new Kafka()
.version("universal")
.topic(topic)
.properties(props)
).withFormat(
new Avro()
  .useRegistry(true)
  .registryUrl(KAFKA_SCHEMA_REGISTRY_URL_ADDRESS)
  .registrySubject(subject)
  .avroSchema(avroSchemaStr)
)


陈帅  于2019年12月18日周三 上午8:26写道:
>
> flink sql是否能够支持将confluent schema registry注册的一个avro数据格式 的topic注册成一张table?


Re: Questions about taskmanager.memory.off-heap and taskmanager.memory.preallocate

2019-12-17 Thread vino yang
Hi Ethan,

Share two things:


   - I have found "taskmanager.memory.preallocate" config option has been
   removed in the master codebase.
   - After researching git history, I found the description of "
   taskmanager.memory.preallocate" was written by @Chesnay Schepler
 (from 1.8 branch). So maybe he can give more
   context or information. Correct me, if I am wrong.

Best,
Vino.

Ethan Li  于2019年12月18日周三 上午10:07写道:

> I didn’t realize we was not chatting in the mailing list :)
>
> I think it’s wrong because it kind of says full GC is triggered by
> reaching MaxDirecMemorySize.
>
>
> On Dec 16, 2019, at 11:03 PM, Xintong Song  wrote:
>
> Glad that helped. I'm also posting this conversation to the public mailing
> list, in case other people have similar questions.
>
> And regarding the GC statement, I think the document is correct.
> - Flink Memory Manager guarantees that the amount of allocated managed
> memory never exceed the configured capacity, thus managed memory allocation
> should not trigger OOM.
> - When preallocation is enabled, managed memory segments are allocated and
> pooled by Flink Memory Manager, no matter there are tasks requesting them
> or not. The segments will not be deallocated until the cluster is shutdown.
> - When preallocation is disabled, managed memory segments are allocated
> only when tasks requesting them, and destroyed immediately when tasks
> return them to the Memory Manager. However, what this statement trying to
> say is that, the memory is not deallocated directly when the memory segment
> is destroyed, but will have to wait until the GC to be truly released.
>
> Thank you~
> Xintong Song
>
>
>
> On Tue, Dec 17, 2019 at 12:30 PM Ethan Li 
> wrote:
>
>> Thank you very much Xintong! It’s much clear to me now.
>>
>> I am still on standalone cluster setup.  Before I was using 350GB on-heap
>> memory on a 378GB box. I saw a lot of swap activities. Now I understand
>> that it’s because RocksDB didn’t have enough memory to use, so OS forces
>> JVM to swap. It can explain why the cluster was not stable and kept
>> crashing.
>>
>> Now that I put 150GB off-heap and 150GB on-heap, the cluster is more
>> stable than before. I thought it was because GC was reduced because now we
>> have less heap memory. Now I understand that it’s because I have 78GB
>> memory available for rocksDB to use, 50GB more than before. And it explains
>> why I don’t see swaps anymore.
>>
>> This makes sense to me now. I just have to set preallocation to false to
>> use the other 150 GB off-heap memory for rocksDB and do some tuning on
>> these memory configs.
>>
>>
>> One thing I noticed is that in
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#taskmanager-memory-preallocate
>>
>>  If this configuration is set to false cleaning up of the allocated
>> off-heap memory happens only when the configured JVM parameter
>> MaxDirectMemorySize is reached by triggering a full GC
>>
>> I think this statement is not correct. GC is not trigged by reaching
>> MaxDirectMemorySize. It will throw "java.lang.OutOfMemoryError: Direct
>> buffer memory” if MaxDirectMemorySize is reached.
>>
>> Thank you again for your help!
>>
>> Best,
>> Ethan
>>
>>
>> On Dec 16, 2019, at 9:44 PM, Xintong Song  wrote:
>>
>> Hi Ethan,
>>
>> When you say "it's doing better than before", what is your setups before?
>> Is it on-heap managed memory? With preallocation enabled or disabled? Also,
>> what deployment (standalone, yarn, or local executor) do you run Flink on?
>> It's hard to tell why the performance becomes better without knowing the
>> information above.
>>
>> Since you are using RocksDB, and configure managed memory to off-heap,
>> you should set pre-allocation to false. Steaming job with RocksDB state
>> backend does not use managed memory at all. Setting managed memory to
>> off-heap only makes Flink to launch JVM with smaller heap space, leaving
>> more space outside JVM. Setting pre-allocation to false makes Flink
>> allocate those managed memory on-demand, and since there's no demand the
>> managed memory will not be allocated. Therefore, the memory space left
>> outside JVM can be fully leveraged by RocksDB.
>>
>> Regarding related source codes, I would recommend the following:
>> - MemoryManager - For how managed memory is allocated / used. Related to
>> pre-allocation.
>> - ContaineredTaskManagerParameters - For how the JVM memory parameters
>> are decided. Related to on-heap / off-heap managed memory.
>> - TaskManagerServices#fromConfiguration - For how different components
>> are created, as well as how their memory sizes are decided. Also related to
>> on-heap / off-heap managed memory.
>>
>> Thank you~
>> Xintong Song
>>
>>
>>
>> On Tue, Dec 17, 2019 at 11:00 AM Ethan Li 
>> wrote:
>>
>>> Thank you Xintong, Vino for taking your time answering my question. I
>>> didn’t know managed memory is only for batch jobs.
>>>
>>>
>>>
>>> I tried to set to use off-heap Flink managed memory 

Re: Questions about taskmanager.memory.off-heap and taskmanager.memory.preallocate

2019-12-17 Thread Ethan Li
I didn’t realize we was not chatting in the mailing list :)

I think it’s wrong because it kind of says full GC is triggered by reaching 
MaxDirecMemorySize. 


> On Dec 16, 2019, at 11:03 PM, Xintong Song  wrote:
> 
> Glad that helped. I'm also posting this conversation to the public mailing 
> list, in case other people have similar questions.
> 
> And regarding the GC statement, I think the document is correct.
> - Flink Memory Manager guarantees that the amount of allocated managed memory 
> never exceed the configured capacity, thus managed memory allocation should 
> not trigger OOM.
> - When preallocation is enabled, managed memory segments are allocated and 
> pooled by Flink Memory Manager, no matter there are tasks requesting them or 
> not. The segments will not be deallocated until the cluster is shutdown.
> - When preallocation is disabled, managed memory segments are allocated only 
> when tasks requesting them, and destroyed immediately when tasks return them 
> to the Memory Manager. However, what this statement trying to say is that, 
> the memory is not deallocated directly when the memory segment is destroyed, 
> but will have to wait until the GC to be truly released.
> 
> Thank you~
> Xintong Song
> 
> 
> On Tue, Dec 17, 2019 at 12:30 PM Ethan Li  > wrote:
> Thank you very much Xintong! It’s much clear to me now. 
> 
> I am still on standalone cluster setup.  Before I was using 350GB on-heap 
> memory on a 378GB box. I saw a lot of swap activities. Now I understand that 
> it’s because RocksDB didn’t have enough memory to use, so OS forces JVM to 
> swap. It can explain why the cluster was not stable and kept crashing.
> 
> Now that I put 150GB off-heap and 150GB on-heap, the cluster is more stable 
> than before. I thought it was because GC was reduced because now we have less 
> heap memory. Now I understand that it’s because I have 78GB memory available 
> for rocksDB to use, 50GB more than before. And it explains why I don’t see 
> swaps anymore. 
> 
> This makes sense to me now. I just have to set preallocation to false to use 
> the other 150 GB off-heap memory for rocksDB and do some tuning on these 
> memory configs. 
> 
> 
> One thing I noticed is that in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#taskmanager-memory-preallocate
>  
> 
> 
>  If this configuration is set to false cleaning up of the allocated off-heap 
> memory happens only when the configured JVM parameter MaxDirectMemorySize is 
> reached by triggering a full GC
> 
> I think this statement is not correct. GC is not trigged by reaching 
> MaxDirectMemorySize. It will throw "java.lang.OutOfMemoryError: Direct buffer 
> memory” if MaxDirectMemorySize is reached. 
> 
> Thank you again for your help!
> 
> Best,
> Ethan
> 
> 
>> On Dec 16, 2019, at 9:44 PM, Xintong Song > > wrote:
>> 
>> Hi Ethan,
>> 
>> When you say "it's doing better than before", what is your setups before? Is 
>> it on-heap managed memory? With preallocation enabled or disabled? Also, 
>> what deployment (standalone, yarn, or local executor) do you run Flink on? 
>> It's hard to tell why the performance becomes better without knowing the 
>> information above.
>> 
>> Since you are using RocksDB, and configure managed memory to off-heap, you 
>> should set pre-allocation to false. Steaming job with RocksDB state backend 
>> does not use managed memory at all. Setting managed memory to off-heap only 
>> makes Flink to launch JVM with smaller heap space, leaving more space 
>> outside JVM. Setting pre-allocation to false makes Flink allocate those 
>> managed memory on-demand, and since there's no demand the managed memory 
>> will not be allocated. Therefore, the memory space left outside JVM can be 
>> fully leveraged by RocksDB.
>> 
>> Regarding related source codes, I would recommend the following:
>> - MemoryManager - For how managed memory is allocated / used. Related to 
>> pre-allocation.
>> - ContaineredTaskManagerParameters - For how the JVM memory parameters are 
>> decided. Related to on-heap / off-heap managed memory.
>> - TaskManagerServices#fromConfiguration - For how different components are 
>> created, as well as how their memory sizes are decided. Also related to 
>> on-heap / off-heap managed memory.
>> 
>> Thank you~
>> Xintong Song
>> 
>> 
>> On Tue, Dec 17, 2019 at 11:00 AM Ethan Li > > wrote:
>> Thank you Xintong, Vino for taking your time answering my question. I didn’t 
>> know managed memory is only for batch jobs.
>> 
>> 
>> 
>> I tried to set to use off-heap Flink managed memory (with preallocation to 
>> true) and it’s doing better than before. It would not make sense if managed 
>> memory is not used. I was confused. Then I found this doc 
>> 

????????????Watermark??flatmap??????????????

2019-12-17 Thread ????
env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...)
env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...);

kafka??Watermark??flatMap()??
flatMap??1-Nkafka??

flink sql confluent schema avro topic注册成表

2019-12-17 Thread 陈帅
flink sql是否能够支持将confluent schema registry注册的一个avro数据格式 的topic注册成一张table?


Flink Table Schema中Types.SQL_TTIMESTAMP类型json表示问题

2019-12-17 Thread 陈帅
Flink Table API Schema定义里面的 Types.SQL_TTIMESTAMP 类型用json表示的话一定要用
-MM-dd'T'HH:mm:ss.SSS'Z'表示吗?
示例程序如下:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
TableConfig tableConfig = tEnv.getConfig();

tableConfig.setIdleStateRetentionTime(Time.minutes(10),Time.minutes(15));

tEnv.registerFunction("DateUtil",new DateUtil());
tEnv.connect(
new Kafka()
.version("universal")
//   "0.8", "0.9", "0.10", "0.11", and "universal"
.topic("jsontest")
.property("bootstrap.servers", "localhost:9092")
.property("group.id","test")
.startFromLatest()
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(

new Schema()




*.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(2000))*
.field("fruit", Types.STRING)
.field("number", Types.INT)
)
.inAppendMode()
.registerTableSource("source");

Table sourceTbl = tEnv.scan("source");
sourceTbl.printSchema();
tEnv.toAppendStream(sourceTbl, Row.class).print();

env.execute();

测试数据 {"eventtime": "2019-12-17T11:11:29.555Z", "fruit": "orange", "number":
45}2019-12-17T11:11:29.555Z", "fruit": "orange", "number": 45}
我想问的是关于 加粗 部分代码的两个问题:
这个示例中我想使用事件时间eventtime,所以 field("rowtime", Types.SQL_TIMESTAMP) 中就一定要使用
rowtime 名称吗?[1]
如果是处理时间processingtime 要如何表示?实际传入的json数据字段却是"eventtime",而且格式方式我试了用 long表示的
epochInMillis不行,改成 -MM-dd HH:mm:ss也不行,后来看了源码用了
-MM-dd'T'HH:mm:ss.SSS'Z' 才通过了。想问一下有没有办法指定dateFormat? [2]


Re: Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-17 Thread KristoffSC
Thank you for your reply Timo.

Regarding point 2. I'm sorry for the delay. I rerun my test and everything
seems to be in order. Open method was called as first. I guess it was a
false alarm. Sorry for that.

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


MapState with List Type for values

2019-12-17 Thread Aaron Langford
Hello Flink Community,

I have a question about using MapState with lists as values. Here is a
description of the use case:

I have an operator over a keyed stream where for each record that comes, it
needs to look into some state to determine if a value has arrived or not.
If the value has not arrived yet, the record needs to be en-queued for when
the value will eventually arrive. When that value does arrive, the queued
records need to be flushed, and associated list state cleared.

Here's an emoji representation of the stream:
-
咽濾濾濾 ...
-
Where  must wait for at least 1 咽 to be output (otherwise be queued) and
濾 must wait for to be output (otherwise be queued). If you can't tell,
this is basically/sort of a join without a window.

The output should be something abstractly like this:

-
{,咽}{,咽}{,咽}{濾,}{濾,}{濾,}...
-

Many records might be en-queued while waiting for a given value. Many
records may be waiting for many different values, but any record that is
en-queued will only be waiting for a record of one type to show up.

Records on this stream are keyed by some shared parent key, and I have
reasons to avoid keying the stream on the "join key" as it were. Namely I'm
getting a CDC stream with a lot of different tables, and I want to avoid a
topology with N operators for N different tables if I can.

If I lean on MapState> to get this done for me, then my job
suffers considerably in terms of performance. I believe one of the biggest
bottlenecks is that for each time I need to interact with a Seq
(like for appends), I must deserialize the entire list.

Is there a way to set up a MapState whose value is a ListState? Or is there
any guidance for how I might serialize/deserialize a list type in the
MapState in such a way that appends aren't so expensive? Open to other
suggestions/approaches as well.

Aaron


Keyed stream, parallelism, load balancing and ensuring that the same key go to the same Task Manager and task slot

2019-12-17 Thread KristoffSC
Hi community, 
I'm trying to build a PoC pipeline for my project and I have few questions
regarding load balancing between task managers and ensuring that keyed
stream events for the same key will go to the same Task Manager (hence the
same task slot).

Lets assume that we have 3 task managers, 3 task slot each. So it gives us 9
task slots in total.
The source is a Kafka topic with N partitions. Events are "linked" with each
other by transactionId (long) field. So they can be keyed by this field.
Events for particular transactionId can be spanned across many partitions
(we don't have control over this).

The pipeline is:
1. Kafka Source -> produces RawEvents (map operator).
2. Enrichment with AsuncFuntion(simple DB/cache call) produces
EnrichedEvents with map operator.
3. Key EnrichedEvents by tradeId, buffer events for some time, sort them by
sequenceNumber (Window aggregation) and emit a new event based on those. 
N sorted EnrichedEvents produces one TransactionEvent for this
transactionId.
4. Sink TransactionEvents

Requirements:
1. Have high task slot utilization (Low number of idle/un-addressed task
slots).
2. EnrichedEvents for the same transactionId should go to the same TaskSlot
(hence the same TaskManager).

Question:
How this can be achieved?
How parallelism value for each operator should be set?

Note:
Probably I can already key the original RawEvents on transactionId.

Thanks,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


MiniCluster with ProcessingTimeTrigger

2019-12-17 Thread John Morrow
Hi All,

I'm trying to test a pipeline that consists of two Flink tasks with a 
MiniCluster. The 1st task has a WindowAll operator which groups items into 
batches every second, and the 2nd task does an async operation with each batch 
and flatMaps the result.

I've whittled it down to the bare bones below. There are two tests:

  *   testPipelineWithCountTrigger - this one works fine 
  *   testPipelineWithProcessingTimeTrigger - this one doesn't give any output 

It seems like a timing issue. If I step through the failing one slowly I can 
see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear methods 
do get called, and the asyncInvoke method also gets called, but when I run it 
the 2nd test fails as it produces no output. I've tried setting the MiniCluster 
timeout to 1 day, the same with my AsyncUDF timeout, and sleeping for 3 * 
window after env.execute but no difference. I'm running this with Flink 1.9.0 
and OpenJDK8 on Ubuntu (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10).


Any idea how I can get the 2nd test to wait to process the output?


Thanks 

John.






import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.junit.jupiter.api.Assertions.assertEquals;


public class StreamTest {

  @Test // :)
  @Tag("unit")
  public void testPipelineWithCountTrigger() throws Exception {
runPipeline(10, CountTrigger.of(10));
  }

  @Test // :(
  @Tag("unit")
  public void testPipelineWithProcessingTimeTrigger() throws Exception {
runPipeline(10, ProcessingTimeTrigger.create());
  }


  private void runPipeline(int inputSize, Trigger trigger) 
throws Exception {

MiniClusterWithClientResource miniCluster = new 
MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1, 
TimeUnit.DAYS))
.build()
);
miniCluster.before();

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
CollectSink.values.clear();

List listOfNumbers = IntStream.rangeClosed(1, 
inputSize).boxed().collect(Collectors.toList());

// 1st half of pipeline
DataStream> pipeA = env.fromCollection(listOfNumbers)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(trigger)
.process(new Batcher());

// 2nd half of pipeline
DataStream pipeB = AsyncDataStream.unorderedWait(pipeA, new 
AsyncUDF(), 1L, TimeUnit.DAYS, 1 )
.flatMap((List records, Collector out) -> 
records.forEach(out::collect)).returns(Types.INT);
pipeB.addSink(new CollectSink());

env.execute();

try {
  Thread.sleep(1000L * 3);
} catch (InterruptedException e) {
  System.out.println();
}
miniCluster.after();

assertEquals(inputSize, CollectSink.values.size());
  }


  public static class Batcher extends ProcessAllWindowFunction, TimeWindow> {
@Override
public void process(Context context, Iterable elements, 
Collector> out) throws Exception {
  out.collect(StreamSupport.stream(elements.spliterator(), 
false).collect(Collectors.toList()));
}
  }

  private static class AsyncUDF extends RichAsyncFunction, 
List> {

private CompletableFuture> doAsyncStuff(List value) {
  return CompletableFuture.supplyAsync(() -> {
try {
  Thread.sleep(100);
} catch (InterruptedException e) {
  

Restore metrics on broadcast state after restart

2019-12-17 Thread Gaël Renoux
Hi everyone

I have an KeyedBroadcastProcessFunction with a broadcast state (a bunch of
rules), and I have set up a few gauge metrics on that state (things such as
number of known rules and timestamp of the last rule received). However, I
have on an issue when the server restarts from a checkpoint or a savepoint:
metrics values are not restored.

That's nothing anomalous: the fields used in the metrics are transient, not
part of the state (I have followed this doc:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types).
The fields will be reset to the proper value in the next call to
processBroadcastElement(), but that's not enough for my use case: rules
updates aren't that frequent (it could be minutes or even hours before the
next one). We can't have the metrics offline for that long.

Is there any way to reset those fields without waiting for the next
messages to arrive? The open() method doesn't have access to the broadcast
state, so I can't do it there. I could do it in processElement() (normal
element are much more frequent than rules), but it's far from ideal:
- it would be done again and again for every single element received, which
is overkill;
- it could only update the metric on the current subtask, not the others,
so one subtask could lag behind.

Am I missing something here ? Is there any way to trigger a reset of the
value when the broadcast state is reconstructed ?

Thanks for any help,
Gaël Renoux


Different jobName per Job when reporting Flink metrics to PushGateway

2019-12-17 Thread Sidney Feiner
I'm using Flink 1.9.1 with PrometheusPushGateway to report my metrics. The 
jobName the metrics are reported with is defined in the flink-conf.yaml file 
which makes the jobName identical for all jobs who run on the cluster, but I 
want a different jobName to be reported for every running job. To do so, I 
tried doing the following in my code before executing the Stream:

Configuration conf = GlobalConfiguration.loadConfiguration();
conf.setString(
"metrics.reporter.promgateway.jobName",
conf.getString("metrics.reporter.promgateway.jobName", "") + "-" + 
pipeline
);
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(conf);


When pipeline is a String variable.

When running the job locally, it worked. But now I'm running flink in High 
Availability mode and it doesn't work anymore :( The config I override in the 
code is ignored.

So how can I change the jobName per job? And if I can't, is there a way to set 
additional Labels when reporting the metrics? Because I haven't seen an option 
for that as well.

Thanks :)


I've posted this on StackOverflow as well - 
here
 :)



Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



Flink x GATE Integration

2019-12-17 Thread Austin Cawley-Edwards
Hi all,

GATE (General Architecture for Text Engineering)[1] is a text processing
framework that runs on the JVM.

Cross-posting from the GATE community.[2]

I'm wondering if anyone has tried to integrate GATE with Flink and, if so,
how successful has the integration been? I'm completely new to GATE, but
trying to explore the options out there for good text processing within
Flink.


Best,

Austin Cawley-Edwards


[1]: https://gate.ac.uk
[2]:
https://groups.io/g/gate-users/topic/usage_with_data_processing/68761558?p=,,,20,0,0,0::recentpostdate%2Fsticky,,,20,2,0,68761558


[SQL] [TableAPI] Table.sqlQuery(sql) 和 tableSink 的 table schema 类型不匹配

2019-12-17 Thread aven . wu

Hi!
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Field types of query result and registered TableSink [aggregationTableSink] do 
not match.

SQL = SELECT count(*) as cnt, TUMBLE_START(rowtime, INTERVAL '10' SECOND) as 
tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND)
使用table.sqlQuery(SQL),返回的table schema 是 Query result schema: [cnt: Long, 
tumTime: Timestamp]。
而使用 
JsonRowSchemaConverter.convert("{" +
"type:'object'," +
"properties:{" +
"cnt: {" +
"type: 'number'" +
"}," +
"tumTime:{" +
"type:'string'," +
"format:'date-time'" +
"}" +
"}" +
“}");
创建Elasticsearch6UpsertTableSink table schema 是 TableSink schema:   [cnt: 
BigDecimal, tumTime: Timestamp]
而且我看了 JsonRowSchemaConverter.convert 所有的数字类型都被转成BigDecimal,导致SQL返回的schema 和 
json定义的schema无法匹配。

请问是我使用的问题还是说框架存在这个问题?

附上源代码:

public class AggregationFunction {



public static void main(String[] args) {
String sql = "SELECT count(*) as cnt, TUMBLE_START(rowtime, INTERVAL 
'10' SECOND) as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' 
SECOND)";
StreamExecutionEnvironment senv = 
StreamExecutionEnvironment.getExecutionEnvironment();
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tenv = StreamTableEnvironment.create(senv);
DataStream source = senv.addSource(new SourceFunction() {
@Override
public void run(SourceContext sourceContext) throws Exception 
{
int i = 1000;
String[] names = {"Hanmeimei", "Lilei"};
while (i > 1) {
sourceContext.collect(new User(names[i%2], i, new 
Timestamp(System.currentTimeMillis(;
Thread.sleep(10);
i--;
}
}
@Override
public void cancel() {

}
});
tenv.registerDataStream("abc", source, "name, age, timestamp, 
rowtime.rowtime");
Table table = tenv.sqlQuery(sql);
List hosts = Arrays.asList(new Host("10.20.128.210", 19201, 
"http"));
TypeInformation typeInformation = 
JsonRowSchemaConverter.convert("{" +
"type:'object'," +
"properties:{" +
"cnt: {" +
"type: 'number'" +
"}," +
"tumTime:{" +
"type:'string'," +
"format:'date-time'" +
"}" +
"}" +
"}");
RowTypeInfo typeInfo = (RowTypeInfo) typeInformation;
TypeInformation[] typeInformations = typeInfo.getFieldTypes();

String[] fieldNames = typeInfo.getFieldNames();
TableSchema.Builder builder = TableSchema.builder();
for (int i = 0; i < typeInformations.length; i ++) {
builder.field(fieldNames[i], typeInformations[i]);
}
Elasticsearch6UpsertTableSink establesink = new 
Elasticsearch6UpsertTableSink(
true,
builder.build(),
hosts,
"aggregation",
"data",
"$",
"n/a",
new JsonRowSerializationSchema.Builder(typeInformation).build(),
XContentType.JSON,
new IgnoringFailureHandler(),
new HashMap<>()
);
tenv.registerTableSink("aggregationTableSink", establesink);
table.insertInto("aggregationTableSink");
}


@Data
@AllArgsConstructor
@NoArgsConstructor
public static class User {
private String name;

private Integer age;

private Timestamp timestamp;
}


}



best wish!


Fwd: How to reprocess certain events in Flink?

2019-12-17 Thread Pooja Agrawal
Hi,

I have a use case where we are reading events from kinesis stream.The event
can look like this
Event {
  event_id,
  transaction_id
  key1,
  key2,
  value,
  timestamp,
  some other fields...
}

We want to aggregate the values per key for all events we have seen till
now (as simple as "select key1, key2, sum(value) from events group by key1,
key2key."). For this I have created a simple flink job which uses
flink-kinesis connector and applies keyby() and sum() on the incoming
events. I am facing two challenges here:

1) The incoming events can have duplicates. How to maintain exactly once
processing here, as processing duplicate events can give me erroneous
result? The field transaction_id will be unique for each events. If two
events have same transaction_id, we can say that they are duplicates (By
duplicates here, I don't just mean the retried ones. The same message can
be present in kinesis with different sequence number. I am not sure if
flink-kinesis connector can handle that, as it is using KCL underlying
which I assume doesn't take care of it)

2) There can be the the cases where the value has been updated for a key
after processing the event and we may want to reprocess those events with
new value. Since this is just a value change, the transaction_id will be
same. The idempotency logic will not allow to reprocess the events. What
are the ways to handle such scenarios in flink?

Thanks
Pooja


-- 
Warm Regards,
Pooja Agrawal


Re: S3A "Data read has a different length than the expected" issue root cause

2019-12-17 Thread Kostas Kloudas
Thanks a lot for reporting this!

I believe that this can be really useful for the community!

Cheers,
Kostas

On Tue, Dec 17, 2019 at 1:29 PM spoganshev  wrote:
>
> In case you experience an exception similar to the following:
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Data
> read has a different length than the expected: dataLength=53562;
> expectedLength=65536; includeSkipped=true; in.getClass()=class
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client$2;
> markedSupported=false; marked=0; resetSinceLastMarked=false; markCount=0;
> resetCount=0
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.util.LengthCheckInputStream.checkLength(LengthCheckInputStream.java:151)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:93)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:76)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInputStream.closeStream(S3AInputStream.java:529)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:490)
> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopDataInputStream.close(HadoopDataInputStream.java:89)
> at
> org.apache.flink.api.common.io.FileInputFormat.close(FileInputFormat.java:861)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:206)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>
>
> The root cause is a bug in Hadoop's S3A filesystem implementation:
> https://issues.apache.org/jira/browse/HADOOP-16767
>
> A temporary hacky workaround is to replace S3AInputStream class and all the
> classes that it requires and use it in a custom filesystem implementation.
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


S3A "Data read has a different length than the expected" issue root cause

2019-12-17 Thread spoganshev
In case you experience an exception similar to the following:

org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Data
read has a different length than the expected: dataLength=53562;
expectedLength=65536; includeSkipped=true; in.getClass()=class
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client$2;
markedSupported=false; marked=0; resetSinceLastMarked=false; markCount=0;
resetCount=0
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.util.LengthCheckInputStream.checkLength(LengthCheckInputStream.java:151)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:93)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:76)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInputStream.closeStream(S3AInputStream.java:529)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:490)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at
org.apache.flink.fs.s3.common.hadoop.HadoopDataInputStream.close(HadoopDataInputStream.java:89)
at
org.apache.flink.api.common.io.FileInputFormat.close(FileInputFormat.java:861)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:206)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


The root cause is a bug in Hadoop's S3A filesystem implementation:
https://issues.apache.org/jira/browse/HADOOP-16767

A temporary hacky workaround is to replace S3AInputStream class and all the
classes that it requires and use it in a custom filesystem implementation.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink slot utilization

2019-12-17 Thread Robert Metzger
Hi,

1) By default, Flink's Kafka connector is polling data from Kafka every
100ms. There's a configuration key "flink.poll-timeout" to change the
frequency. I don't have experience with these internal log messages from
Kafka, but since they are on INFO level (and if you don't see any
unexpected data), I would ignore them for now.

2) The slots are not reserving memory. A slot is basically a thread running
on the TaskManager. But you can't enforce the amount of memory available to
a thread, thus all slots share the pool of available memory of the
TaskManager.
If you want to run multiple low throughput pipelines on Flink, it is not a
problem to oversubscribe your TaskManagers. For a machine with say 8 cores
and 16 Gb of memory, you could configure 100, or even 500 slots, if they
are not very resource intensive.

With StateFun, you can have millions of actors on a TaskManager. If they
are not receiving any data, they won't allocate resources.

Best,
Robert


On Tue, Dec 17, 2019 at 11:37 AM Andrés Garagiola 
wrote:

> Thanks Roberts,
>
>
> About your questions, I don't have yet a real estimation regarding the
> number of records received by the pipeline but I guess that the pipeline
> could be idle for several minutes (I don't think that for hours).
>
>
> My concern comes to me from two aspects:
>
>
> 1) I saw multiple lines in the Flink task manager logs like the ones
> listed below. Sounds like if the pipeline is doing polling over the Kafka
> topic source, I don't know if I can control this behavior in some way to
> reduce the CPU consumption when I can tolerate some latency.
>
>
> *2019-12-17 05:25:56,720 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Resetting offset for partition test-topic-0 to offset 561.*
>
> *2019-12-17 05:25:56,720 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Seeking to LATEST offset of partition test-topic-0*
>
> *2019-12-17 05:25:56,721 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Resetting offset for partition test-topic-0 to offset 561.*
>
> *2019-12-17 05:25:56,721 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Seeking to LATEST offset of partition test-topic-0*
>
> *2019-12-17 05:25:56,722 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Resetting offset for partition test-topic-0 to offset 561.*
>
>
> 2) I read that every slot reserves a portion of the task manager's memory,
> so I would like to reuse that memory between multiple pipelines (again in
> the context where some latency is allowed). I understand that this is not
> possible in the current state of Flink but would be possible by avoiding
> the direct map with Statefun, isn't it?
>
>
> Thanks again for your reply.
>
> Regards
>
> On Tue, Dec 17, 2019 at 11:10 AM Robert Metzger 
> wrote:
>
>> Hi Andrés,
>>
>> sorry for the late reply.
>> 1. The slots are released, when the streaming pipeline ends. In
>> principle, it is not a problem when a slot is allocated, even when not
>> processing any incoming messages. So you are not doing something wrong. How
>> many records do you receive per pipeline? (are they idle for multiple
>> hours?)
>> There's a way to utilize the slots more efficiently: https://statefun.io/ 
>> Statefun
>> will be contributed to Flink soon.
>> StateFun doesn't have a direct slots to pipeline mapping.
>>
>> 2. The memory consumption per slot greatly depends on what kind of
>> operator you are running in it. A heap statebackend might need a few
>> gigabytes, a stateless mapper needs almost no memory. Some time ago, I
>> wrote a blog post on sizing a Flink cluster:
>> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>>
>> Best,
>> Robert
>>
>>
>> On Fri, Dec 13, 2019 at 5:06 PM Andrés Garagiola <
>> andresgaragi...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I'm testing Flink to do stream processing, in my use case there are
>>> multiples pipelines processing messages from multiple Kafka sources. I have
>>> some questions regarding the jobs and slots.
>>>
>>> 1) When I deploy a new job, it takes a job slot in the TM, the job never
>>> ends (I think it doesn't end because is a stream pipeline), and the slot is
>>> never released, this means that the slot is busy even when no new messages
>>> are coming from the Kafka topic. Is that OK or I'm doing something wrong?
>>> Is there a way to do a more efficient utilization of the job slots?
>>>
>>> 2) In my use case, I need good job scalability. Potentially I could have
>>> 

Re: Scala ListBuffer cannot be used as a POJO type in Flink

2019-12-17 Thread Andrey Zagrebin
Hi Utopia,

There were already couple of hints in comments to your stack overflow questions 
about immutability.
In general, I would also recommend this because when you work with Flink state 
the general API contract is
that if you update the your state object (schoolDescriptor) you have to call 
state#update with it.
This might work for heap state without calling update (not always guaranteed by 
API) but will not work e.g. for RocksDB state backend.
The serialisation is also much easier if you use pure POJOs [1].

In your case non-POJO, the general approach is to implement your custom 
org.apache.flink.api.common.typeutils.TypeSerializer or register a custom 
serialiser [2]
to use another state descriptor constructor:  ListStateDescriptor(String name, 
TypeSerializer typeSerializer)
or refactor your classes to support out of the box serialisation [3].

Best,
Andrey

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#rules-for-pojo-types
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/custom_serializers.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html

> On 16 Dec 2019, at 08:42, Utopia  wrote:
> 
> Hello everyone,
> 
> When I run the code below. The log print:
> 
> > class scala.collection.mutable.ListBuffer does not contain a setter for 
> > field scala$collection$mutable$ListBuffer$$start
> 
> > Class class scala.collection.mutable.ListBuffer cannot be used as a POJO 
> > type because not all fields are valid POJO fields, and must be processed as 
> > GenericType.  
>  
> Code:
> 
> private lazy val schoolDescriptor = new 
> ListStateDescriptor[School]("schoolDescriptor", classOf[School])
> 
> 
> context.globalState.getListSate(schoolDescriptor).update(ListBuffer(new 
> School))
> 
> Class define:
> 
> class School {
>var classes: ListBuffer[Class] = ListBuffer()
> }
> 
> class Class {
>var students: ListBuffer[Class] = ListBuffer()
> }
> 
> class Student {
>var name = “”
> }
> 
> What should I do if POJO has ListBuffer type field, and the element of 
> ListBuffer also has ListBuffer type field?
> 
> https://stackoverflow.com/q/59352295/4388077 
> 
> 
> 
> Best  regards
> Utopia



Re: Flink slot utilization

2019-12-17 Thread Andrés Garagiola
Thanks Roberts,


About your questions, I don't have yet a real estimation regarding the
number of records received by the pipeline but I guess that the pipeline
could be idle for several minutes (I don't think that for hours).


My concern comes to me from two aspects:


1) I saw multiple lines in the Flink task manager logs like the ones listed
below. Sounds like if the pipeline is doing polling over the Kafka topic
source, I don't know if I can control this behavior in some way to reduce
the CPU consumption when I can tolerate some latency.


*2019-12-17 05:25:56,720 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
Resetting offset for partition test-topic-0 to offset 561.*

*2019-12-17 05:25:56,720 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
Seeking to LATEST offset of partition test-topic-0*

*2019-12-17 05:25:56,721 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
Resetting offset for partition test-topic-0 to offset 561.*

*2019-12-17 05:25:56,721 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
Seeking to LATEST offset of partition test-topic-0*

*2019-12-17 05:25:56,722 INFO
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
Resetting offset for partition test-topic-0 to offset 561.*


2) I read that every slot reserves a portion of the task manager's memory,
so I would like to reuse that memory between multiple pipelines (again in
the context where some latency is allowed). I understand that this is not
possible in the current state of Flink but would be possible by avoiding
the direct map with Statefun, isn't it?


Thanks again for your reply.

Regards

On Tue, Dec 17, 2019 at 11:10 AM Robert Metzger  wrote:

> Hi Andrés,
>
> sorry for the late reply.
> 1. The slots are released, when the streaming pipeline ends. In principle,
> it is not a problem when a slot is allocated, even when not processing any
> incoming messages. So you are not doing something wrong. How many records
> do you receive per pipeline? (are they idle for multiple hours?)
> There's a way to utilize the slots more efficiently: https://statefun.io/ 
> Statefun
> will be contributed to Flink soon.
> StateFun doesn't have a direct slots to pipeline mapping.
>
> 2. The memory consumption per slot greatly depends on what kind of
> operator you are running in it. A heap statebackend might need a few
> gigabytes, a stateless mapper needs almost no memory. Some time ago, I
> wrote a blog post on sizing a Flink cluster:
> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>
> Best,
> Robert
>
>
> On Fri, Dec 13, 2019 at 5:06 PM Andrés Garagiola <
> andresgaragi...@gmail.com> wrote:
>
>> Hi
>>
>> I'm testing Flink to do stream processing, in my use case there are
>> multiples pipelines processing messages from multiple Kafka sources. I have
>> some questions regarding the jobs and slots.
>>
>> 1) When I deploy a new job, it takes a job slot in the TM, the job never
>> ends (I think it doesn't end because is a stream pipeline), and the slot is
>> never released, this means that the slot is busy even when no new messages
>> are coming from the Kafka topic. Is that OK or I'm doing something wrong?
>> Is there a way to do a more efficient utilization of the job slots?
>>
>> 2) In my use case, I need good job scalability. Potentially I could have
>> many pipelines running in the Flink environment, but on the other hand,
>> increase latency would not be a serious problem for me. There are some
>> recommendations regarding memory for slot? I saw that the CPU
>> recommendation is a core per slot, taking into account that increase the
>> latency would not be a big problem, do you see another good reason to
>> follow this recommendation?
>>
>> Thank you
>> Regards
>>
>


Re: sink type error in scala

2019-12-17 Thread Timo Walther

Hi Fanbin,

I think you are mixing different APIs together. We have a Scala and Java 
version of both DataStream and Table API. The error message indicates 
that `toRetractStream` is called on a Java Table API class because it 
returns org.apache.flink.api.java.tuple.Tuple2 but your sink is 
implemented in Scala with Scala tuple syntax like `(Boolean, Row)`.


Make sure you are using `org.apache.flink.table.api.scala._` instead of 
`org.apache.flink.table.api.java._`.


I hope this helps.

Regards,
Timo


On 15.12.19 03:25, Fanbin Bu wrote:

Hi

I have my sink defined as:
class MyAwesomeSink() extends RichSinkFunction[(Boolean, Row)] {
...
}

But compile complains when I use it like:
val sink = new MyAwesomeSink()
tableEnv.toRetractStream(queryResult, classOf[Row]).addSink(sink)

  found   : MyAwesomeSink
  required: 
org.apache.flink.streaming.api.functions.sink.SinkFunction[org.apache.flink.api.java.tuple.Tuple2[Boolean,org.apache.flink.types.Row]]



I'm using Flink 1.9 with blink.
I tried
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala._
and it doesn't work.


Any ideas?

Thanks,
Fanbin






Re: [ANNOUNCE] Weekly Community Update 2019/50

2019-12-17 Thread Hequn Cheng
Cool. I will do it in the next three weeks.
Thanks a lot for your continued great work!

Best, Hequn

On Tue, Dec 17, 2019 at 6:16 PM Konstantin Knauf 
wrote:

> Hi Hequn,
>
> thanks, and thanks for the offer. Of course, you can cover the holiday
> break, i.e. the next three weeks. Looking forward to your updates!
>
> Cheers,
>
> Konstantin
>
> On Mon, Dec 16, 2019 at 5:53 AM Hequn Cheng  wrote:
>
>> Hi Konstantin,
>>
>> Happy holidays and thanks a lot for your great job on the updates
>> continuously.
>> With the updates, it is easier for us to catch up with what's going on in
>> the community, which I think is quite helpful.
>>
>> I'm wondering if I can do some help and cover this during your vocation.
>> :)
>>
>> Best,
>> Hequn
>>
>> On Sun, Dec 15, 2019 at 11:36 PM Konstantin Knauf <
>> konstan...@ververica.com> wrote:
>>
>>> Dear community,
>>>
>>> happy to share this week's brief community digest with updates on Flink
>>> 1.8.3 and Flink 1.10, a discussion on how to facilitate easier Flink/Hive
>>> setups, a couple of blog posts and a bit more.
>>>
>>> *Personal Note:* Thank you for reading these updates since I started
>>> them early this year. I will take a three week Christmas break and will be
>>> back with a Holiday season community update on the 12th of January.
>>>
>>> Flink Development
>>> ==
>>>
>>> * [releases] Apache Flink 1.8.3 was released on Wednesday. [1,2]
>>>
>>> * [releases] The feature freeze for Apache Flink took place on Monday.
>>> The community is now working on testing, bug fixes and improving the
>>> documentation in order to create a first release candidate soon. [3]
>>>
>>> * [development process] Seth has revived the discussion on a past PR by
>>> Marta, which added a documentation style guide to the contributor guide.
>>> Please check it [4] out, if you are contributing documentation to Apache
>>> Flink. [5]
>>>
>>> * [security] Following a recent report to the Flink PMC of "exploiting"
>>> the Flink Web UI for remote code execution, Robert has started a discussion
>>> on how to improve the tooling/documentation to make users aware of this
>>> possibility and recommend securing this interface in production setups. [6]
>>>
>>> * [sql] Bowen has started a discussion on how to simplify the Flink-Hive
>>> setup for new users as currently users need to add some additional
>>> dependencies to the classpath manually. The discussion seems to conclude
>>> towards providing a single additional hive-uber jar, which contains all the
>>> required dependencies. [7]
>>>
>>> [1] https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-8-3-released-tp35868.html
>>> [3]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Feature-freeze-for-Apache-Flink-1-10-0-release-tp35139.html
>>> [4] https://github.com/apache/flink-web/pull/240
>>> [5]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Docs-Style-Guide-Review-tp35758.html
>>> [6]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-documentation-tooling-around-security-of-Flink-tp35898.html
>>> [7]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-have-separate-Flink-distributions-with-built-in-Hive-dependencies-tp35918.html
>>>
>>> Notable Bugs
>>> ==
>>>
>>> [FLINK-15152] [1.9.1] When a "stop" action on a job fails, because not
>>> all tasks are in "RUNNING" state the job is not checkpointing afterwards.
>>> [8]
>>>
>>> [8] https://issues.apache.org/jira/browse/FLINK-15152
>>>
>>> Events, Blog Posts, Misc
>>> ===
>>>
>>> * Zhu Zhu is now an Apache Flink Comitter. Congratulations! [9]
>>>
>>> * Gerred Dillon has published a blog post on the Apache Flink blog on
>>> how to run Flink on Kubernetes with a KUDO Flink operator. [10]
>>>
>>> * In this blog post Apache Flink PMC Sun Jincheng outlines the reasons
>>> and motivation for his and his colleague's work to provide a world-class
>>> Python support for Apache Flink's Table API. [11]
>>>
>>> * Upcoming Meetups
>>> * On December 17th there will be the second Apache Flink meetup in
>>> Seoul. [12] *Dongwon* has shared a detailed agenda in last weeks
>>> community update. [13]
>>> * On December 18th Alexander Fedulov will talk about Stateful Stream
>>> Processing with Apache Flink at the Java Professionals Meetup in Minsk. [14]
>>>
>>> [9]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Zhu-Zhu-becomes-a-Flink-committer-tp35944.html
>>> [10] https://flink.apache.org/news/2019/12/09/flink-kubernetes-kudo.html
>>> [11]
>>> https://developpaper.com/why-will-apache-flink-1-9-0-support-the-python-api/
>>> [12] https://www.meetup.com/Seoul-Apache-Flink-Meetup/events/266824815/
>>> [13]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Weekly-Community-Update-2019-48-td35423.html
>>> [14] 

Re: [ANNOUNCE] Weekly Community Update 2019/50

2019-12-17 Thread Konstantin Knauf
Hi Hequn,

thanks, and thanks for the offer. Of course, you can cover the holiday
break, i.e. the next three weeks. Looking forward to your updates!

Cheers,

Konstantin

On Mon, Dec 16, 2019 at 5:53 AM Hequn Cheng  wrote:

> Hi Konstantin,
>
> Happy holidays and thanks a lot for your great job on the updates
> continuously.
> With the updates, it is easier for us to catch up with what's going on in
> the community, which I think is quite helpful.
>
> I'm wondering if I can do some help and cover this during your vocation. :)
>
> Best,
> Hequn
>
> On Sun, Dec 15, 2019 at 11:36 PM Konstantin Knauf <
> konstan...@ververica.com> wrote:
>
>> Dear community,
>>
>> happy to share this week's brief community digest with updates on Flink
>> 1.8.3 and Flink 1.10, a discussion on how to facilitate easier Flink/Hive
>> setups, a couple of blog posts and a bit more.
>>
>> *Personal Note:* Thank you for reading these updates since I started
>> them early this year. I will take a three week Christmas break and will be
>> back with a Holiday season community update on the 12th of January.
>>
>> Flink Development
>> ==
>>
>> * [releases] Apache Flink 1.8.3 was released on Wednesday. [1,2]
>>
>> * [releases] The feature freeze for Apache Flink took place on Monday.
>> The community is now working on testing, bug fixes and improving the
>> documentation in order to create a first release candidate soon. [3]
>>
>> * [development process] Seth has revived the discussion on a past PR by
>> Marta, which added a documentation style guide to the contributor guide.
>> Please check it [4] out, if you are contributing documentation to Apache
>> Flink. [5]
>>
>> * [security] Following a recent report to the Flink PMC of "exploiting"
>> the Flink Web UI for remote code execution, Robert has started a discussion
>> on how to improve the tooling/documentation to make users aware of this
>> possibility and recommend securing this interface in production setups. [6]
>>
>> * [sql] Bowen has started a discussion on how to simplify the Flink-Hive
>> setup for new users as currently users need to add some additional
>> dependencies to the classpath manually. The discussion seems to conclude
>> towards providing a single additional hive-uber jar, which contains all the
>> required dependencies. [7]
>>
>> [1] https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-8-3-released-tp35868.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Feature-freeze-for-Apache-Flink-1-10-0-release-tp35139.html
>> [4] https://github.com/apache/flink-web/pull/240
>> [5]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Docs-Style-Guide-Review-tp35758.html
>> [6]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-documentation-tooling-around-security-of-Flink-tp35898.html
>> [7]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-have-separate-Flink-distributions-with-built-in-Hive-dependencies-tp35918.html
>>
>> Notable Bugs
>> ==
>>
>> [FLINK-15152] [1.9.1] When a "stop" action on a job fails, because not
>> all tasks are in "RUNNING" state the job is not checkpointing afterwards.
>> [8]
>>
>> [8] https://issues.apache.org/jira/browse/FLINK-15152
>>
>> Events, Blog Posts, Misc
>> ===
>>
>> * Zhu Zhu is now an Apache Flink Comitter. Congratulations! [9]
>>
>> * Gerred Dillon has published a blog post on the Apache Flink blog on how
>> to run Flink on Kubernetes with a KUDO Flink operator. [10]
>>
>> * In this blog post Apache Flink PMC Sun Jincheng outlines the reasons
>> and motivation for his and his colleague's work to provide a world-class
>> Python support for Apache Flink's Table API. [11]
>>
>> * Upcoming Meetups
>> * On December 17th there will be the second Apache Flink meetup in
>> Seoul. [12] *Dongwon* has shared a detailed agenda in last weeks
>> community update. [13]
>> * On December 18th Alexander Fedulov will talk about Stateful Stream
>> Processing with Apache Flink at the Java Professionals Meetup in Minsk. [14]
>>
>> [9]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Zhu-Zhu-becomes-a-Flink-committer-tp35944.html
>> [10] https://flink.apache.org/news/2019/12/09/flink-kubernetes-kudo.html
>> [11]
>> https://developpaper.com/why-will-apache-flink-1-9-0-support-the-python-api/
>> [12] https://www.meetup.com/Seoul-Apache-Flink-Meetup/events/266824815/
>> [13]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Weekly-Community-Update-2019-48-td35423.html
>> [14] https://www.meetup.com/Apache-Flink-Meetup-Minsk/events/267134296/
>>
>> Cheers,
>>
>> Konstantin (@snntrable)
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>>
>> Follow us @VervericaData Ververica 
>>
>>
>> --
>>
>> 

Re: FLINK 1.9 + YARN+ SessionWindows + 大数据量 + 运行一段时间后 OOM

2019-12-17 Thread Xintong Song
你这个不是OOM,是 container 内存超用被 yarn 杀掉了。
JVM 的内存是不可能超用的,否则会报 OOM。所以比较可能是 RocksDB 的内存够用量增加导致了超用。

建议:

1.  增加如下配置
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: false

2. 若果已经采用了如下配置,或者改了配置之后仍存在问题,可以尝试调大下面这个配置,未配置时默认值是0.25
containerized.heap-cutoff-ratio

Thank you~

Xintong Song



On Tue, Dec 17, 2019 at 5:49 PM USERNAME  wrote:

> 版本:flink 1.9.1
> --运行命令
> flink run -d -m yarn-cluster -yn 40 -ys 2 
>
>
> --部分代码
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> RocksDBStateBackend backend = new RocksDBStateBackend(CHECKPOINT_PATH,
> true);
>
>
> .keyBy("imei")  //10W+
> .window(EventTimeSessionWindows.withGap(Time.hours(1))) //设备超过1小时没有点就算离线
> .trigger(new Trigger())
> .aggregate(new AggregateFunction(), new ProcessWindowFunction())
>
>
> --数据
> 总共10W+设备,每个设备每30秒一条数据,一分钟数据量20W左右。
>
>
> --错误现象
> 运行一段时间(几天)之后,taskmanager就会挂掉。
>
>
> --求教
> 1. flink 内存不断增加?
> 数据量是挺大的,并且窗口保留期可能会很长,但是实际数据运算一次就可以不用了,也做了StateTtlConfig 不知道
> 哪里?什么?导致的内存一直占用,可能用法有问题,希望大神能够指点一下迷津。
> 2. flink / yarn 参数配置能优化吗?
> 有flink on yarn 的配置最佳实践吗?
>
>
> 问题困扰很久了 从1.7 - 1.8 - 1.9 ,希望有熟悉内部机制和有过类似问题的大神指点一下。
>
>
>
>
> --错误信息 --> nodemanager .log
>
>
> 2019-12-17 16:55:16,545 WARN
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> Process tree for container: container_e16_1575354121024_0050_01_08 has
> processes older than 1 iteration running over the configured limit.
> Limit=3221225472, current usage = 3222126592
> 2019-12-17 16:55:16,546 WARN
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> Container
> [pid=184523,containerID=container_e16_1575354121024_0050_01_08] is
> running 901120B beyond the 'PHYSICAL' memory limit. Current usage: 3.0 GB
> of 3 GB physical memory used; 4.9 GB of 30 GB virtual memory used. Killing
> container.
> Dump of the process-tree for container_e16_1575354121024_0050_01_08 :
> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> |- 184701 184523 184523 184523 (java) 21977 4845 5166649344 786279
> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m
> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC
> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
> -Dlogback.configurationFile=file:./logback.xml
> -Dlog4j.configuration=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .
> |- 184523 184521 184523 184523 (bash) 2 3 118067200 373 /bin/bash -c
> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m
> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC
> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
> -Dlogback.configurationFile=file:./logback.xml
> -Dlog4j.configuration=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1>
> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.out
> 2>
> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.err
>
>
>
> 2019-12-17 16:55:16,546 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> Removed ProcessTree with root 184523
> 2019-12-17 16:55:16,547 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
> Container container_e16_1575354121024_0050_01_08 transitioned from
> RUNNING to KILLING
> 2019-12-17 16:55:16,549 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
> Cleaning up container container_e16_1575354121024_0050_01_08
> 2019-12-17 16:55:16,579 WARN
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
> code from container container_e16_1575354121024_0050_01_08 is : 143


Re: Flink slot utilization

2019-12-17 Thread Robert Metzger
Hi Andrés,

sorry for the late reply.
1. The slots are released, when the streaming pipeline ends. In principle,
it is not a problem when a slot is allocated, even when not processing any
incoming messages. So you are not doing something wrong. How many records
do you receive per pipeline? (are they idle for multiple hours?)
There's a way to utilize the slots more efficiently:
https://statefun.io/ Statefun
will be contributed to Flink soon.
StateFun doesn't have a direct slots to pipeline mapping.

2. The memory consumption per slot greatly depends on what kind of operator
you are running in it. A heap statebackend might need a few gigabytes, a
stateless mapper needs almost no memory. Some time ago, I wrote a blog post
on sizing a Flink cluster:
https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines

Best,
Robert


On Fri, Dec 13, 2019 at 5:06 PM Andrés Garagiola 
wrote:

> Hi
>
> I'm testing Flink to do stream processing, in my use case there are
> multiples pipelines processing messages from multiple Kafka sources. I have
> some questions regarding the jobs and slots.
>
> 1) When I deploy a new job, it takes a job slot in the TM, the job never
> ends (I think it doesn't end because is a stream pipeline), and the slot is
> never released, this means that the slot is busy even when no new messages
> are coming from the Kafka topic. Is that OK or I'm doing something wrong?
> Is there a way to do a more efficient utilization of the job slots?
>
> 2) In my use case, I need good job scalability. Potentially I could have
> many pipelines running in the Flink environment, but on the other hand,
> increase latency would not be a serious problem for me. There are some
> recommendations regarding memory for slot? I saw that the CPU
> recommendation is a core per slot, taking into account that increase the
> latency would not be a big problem, do you see another good reason to
> follow this recommendation?
>
> Thank you
> Regards
>


FLINK 1.9 + YARN+ SessionWindows + 大数据量 + 运行一段时间后 OOM

2019-12-17 Thread USERNAME
版本:flink 1.9.1
--运行命令
flink run -d -m yarn-cluster -yn 40 -ys 2 


--部分代码
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
RocksDBStateBackend backend = new RocksDBStateBackend(CHECKPOINT_PATH, true);


.keyBy("imei")  //10W+
.window(EventTimeSessionWindows.withGap(Time.hours(1))) //设备超过1小时没有点就算离线
.trigger(new Trigger())
.aggregate(new AggregateFunction(), new ProcessWindowFunction())


--数据
总共10W+设备,每个设备每30秒一条数据,一分钟数据量20W左右。


--错误现象
运行一段时间(几天)之后,taskmanager就会挂掉。


--求教
1. flink 内存不断增加?
数据量是挺大的,并且窗口保留期可能会很长,但是实际数据运算一次就可以不用了,也做了StateTtlConfig 不知道 
哪里?什么?导致的内存一直占用,可能用法有问题,希望大神能够指点一下迷津。
2. flink / yarn 参数配置能优化吗?
有flink on yarn 的配置最佳实践吗?


问题困扰很久了 从1.7 - 1.8 - 1.9 ,希望有熟悉内部机制和有过类似问题的大神指点一下。




--错误信息 --> nodemanager .log


2019-12-17 16:55:16,545 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Process tree for container: container_e16_1575354121024_0050_01_08 has 
processes older than 1 iteration running over the configured limit. 
Limit=3221225472, current usage = 3222126592
2019-12-17 16:55:16,546 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=184523,containerID=container_e16_1575354121024_0050_01_08] 
is running 901120B beyond the 'PHYSICAL' memory limit. Current usage: 3.0 GB of 
3 GB physical memory used; 4.9 GB of 30 GB virtual memory used. Killing 
container.
Dump of the process-tree for container_e16_1575354121024_0050_01_08 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 184701 184523 184523 184523 (java) 21977 4845 5166649344 786279 
/usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m 
-XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC 
-XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly 
-XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError 
-Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
 -Dlogback.configurationFile=file:./logback.xml 
-Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 
|- 184523 184521 184523 184523 (bash) 2 3 118067200 373 /bin/bash -c 
/usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m 
-XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC 
-XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly 
-XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError 
-Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
 -Dlogback.configurationFile=file:./logback.xml 
-Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.out
 2> 
/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.err
 


2019-12-17 16:55:16,546 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Removed ProcessTree with root 184523
2019-12-17 16:55:16,547 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
 Container container_e16_1575354121024_0050_01_08 transitioned from RUNNING 
to KILLING
2019-12-17 16:55:16,549 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_e16_1575354121024_0050_01_08
2019-12-17 16:55:16,579 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_e16_1575354121024_0050_01_08 is : 143