Re: unaligned checkpoint for job with large start delay

2022-01-11 Thread Piotr Nowojski
Hi Thias and Mason,

> state-backend-rocksdb-metrics-estimate-num-keys

Indeed that can be a good indicator. However keep in mind that, depending
on your logic, there might be many existing windows for each key.

>  However, it’s not so clear how to count the windows that have been
registered since the window assigner does not expose the run time
context—is this even the right place to count?

Yes, I think you are unfortunately right. I've looked at the code, and it
wouldn't be even that easy to add such a metric. Sorry for misleading you.
But a spike in triggered windows is astrong indication that they were
triggered all at once.

> Perhaps, it can be an opt in feature? I do it see it being really useful
since most users aren’t really familiar with windows and these metrics can
help easily identify the common problem of too many windows firing.
> The additional metrics certainly help in diagnosing some of the symptoms
of the root problem.

I will think about how to solve it. I would be against an opt in metric, as
it would complicate code and configuration for the users while barely
anyone would use it.

Note that huge checkpoint start delay with unaligned checkpoints already
confirms that the system has been blocked by something. As I mentioned
before, there are a number of reasons why: record size larger than buffer
size, flatMap functions/operators multiplying number of records, large
number of timers fired at once. Summing up everything that you have
reported so far, we ruled out the former two options, and spike in the
number of triggered windows almost confirms that this is the issue at hand.

Best,
Piotrek

śr., 12 sty 2022 o 08:32 Schwalbe Matthias 
napisał(a):

> Hi Mason,
>
>
>
> Since you are using RocksDB, you could enable this metric [1]
> state-backend-rocksdb-metrics-estimate-num-keys which gives (afaik) good
> indication of the number of active windows.
>
> I’ve never seen (despite the warning) negative effect on the runtime.
>
>
>
> Hope this help …
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-estimate-num-keys
>
>
>
> *From:* Mason Chen 
> *Sent:* Dienstag, 11. Januar 2022 19:20
> *To:* Piotr Nowojski 
> *Cc:* Mason Chen ; user 
> *Subject:* Re: unaligned checkpoint for job with large start delay
>
>
>
> Hi Piotrek,
>
>
>
> No worries—I hope you had a good break.
>
>
>
> Counting how many windows have been registered/fired and plotting that
> over time.
>
> It’s straightforward to count windows that are fired (the trigger exposes
> the run time context and we can collect the information in that code path).
> However, it’s not so clear how to count the windows that have been
> registered since the window assigner does not expose the run time
> context—is this even the right place to count? It’s not necessarily the
> case that an assignment results in a new window registered. Am I missing
> anything else relevant from the user facing interface perspective?
>
>
>
>  Unfortunately at the moment I don't know how to implement such a metric
> without affecting performance on the critical path, so I don't see this
> happening soon :(
>
> Perhaps, it can be an opt in feature? I do it see it being really useful
> since most users aren’t really familiar with windows and these metrics can
> help easily identify the common problem of too many windows firing.
>
>
>
> The additional metrics certainly help in diagnosing some of the symptoms
> of the root problem.
>
>
>
> Best,
>
> Mason
>
>
>
> On Jan 10, 2022, at 1:00 AM, Piotr Nowojski  wrote:
>
>
>
> Hi Mason,
>
>
>
> Sorry for a late reply, but I was OoO.
>
>
>
> I think you could confirm it with more custom metrics. Counting how many
> windows have been registered/fired and plotting that over time.
>
>
>
> I think it would be more helpful in this case to check how long a task has
> been blocked being "busy" processing for example timers. FLINK-25414 shows
> only blocked on being hard/soft backpressure. Unfortunately at the moment I
> don't know how to implement such a metric without affecting performance on
> the critical path, so I don't see this happening soon :(
>
>
>
> Best,
>
> Piotrek
>
>
>
> wt., 4 sty 2022 o 18:02 Mason Chen  napisał(a):
>
> Hi Piotrek,
>
>
>
> In other words, something (presumably a watermark) has fired more than 151
> 200 windows at once, which is taking ~1h 10minutes to process and during
> this time the checkpoint can not make any progress. Is this number of
> triggered windows plausible in your scenario?
>
>
>
> It seems plausible—there are potentially many keys (and many windows). Is
> there a way to confirm with metrics? We can add a window fire counter to
> the window operator that only gets incremented at the end of windows
> evaluation, in order to see the huge jumps in window fires. I can this
> benefiting other users who troubleshoot the problem of large number of
> window firing.
>
>
>
> Best,
>
> Mason

Flink 1.12 Blink planner timestamp类型转换异常

2022-01-11 Thread 张健
Hi, all




flink1.12 Blink planner有人遇到过这样的问题么:



下面是简化的逻辑
DataStream ds = .map(xxxRichMapFunction);
Table table = tableEnv.fromDataStream(ds);
tableEnv.toAppendStream(table.select(xxx).keyBy(xxx), 
Row.class).addSink(xxxRichSinkFunction);


xxxRichMapFunction中对某个字段写了row.setField(index, java.sql.timestamp.valueOf(str)); 
即写入的值是一个 java.sql.timestamp类型的字段,且xxxRichMapFunction 中getProducedType() 
中对应的位置也是Types.SQL_TIMESTAMP。
但在xxxRichSinkFunction我读取该字段时,却变成了java.time.LocalDateTime类型,导致在JdbcUtils.setRecordToStatement中抛出了java.lang.ClassCastException


我用 Blink planner就会有这个问题,如果程序启动时用useOldPlanner()就不会有这个问题。


张健




 





 

Re: JVM SEGV crash in 1.14.2 for scala 2.12

2022-01-11 Thread Eugene Chung
I tested lower versions like 1.12 and 1.13, but it all failed with the same
error.
Fortunately, it is okay with OpenJDK 11.0_12.
I think Flink should consider minimum Java version support as 11.


Best regards,
Eugene Chung (Korean : 정의근)


On Wed, Jan 12, 2022 at 11:11 AM Eugene Chung 
wrote:

> Hi all,
>
> I downloaded 1.14.2 for scala 2.12 and executed a simple
> example, ./bin/flink run examples/streaming/WordCount.jar
>
> But in my environment, Mac OS Monterey with OpenJDK 8_312, the TaskManager
> JVM crashes with SEGV. Clearly, Unsafe class call in MemorySegment makes it
> happen.
>
> How can I resolve this? Please help.
>
> Here's some parts of hs_err.log below;
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x00010a1df399, pid=68596,
> tid=0xc903
> #
> # JRE version: OpenJDK Runtime Environment (8.0_312) (build
> 1.8.0_312-bre_2022_01_01_23_04-b00)
> # Java VM: OpenJDK 64-Bit Server VM (25.312-b00 mixed mode bsd-amd64
> compressed oops)
> # Problematic frame:
> # V  [libjvm.dylib+0x546399]
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core
> dumping, try "ulimit -c unlimited" before starting
>  Java again
> #
> # If you would like to submit a bug report, please visit:
> #   https://github.com/Homebrew/homebrew-core/issues
> #
>
> ---  T H R E A D  ---
>
> Current thread (0x7fd498d8c000):  JavaThread "Keyed Aggregation ->
> Sink: Print to Std. Out (1/1)#0" [_thread_in_vm, id=51
> 459, stack(0x76a1a000,0x76b1a000)]
>
> siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr:
> 0x
>
> ...
>
> Stack: [0x76a1a000,0x76b1a000],  sp=0x76b19290,
>  free space=1020k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native
> code)
> V  [libjvm.dylib+0x546399]
> J 2219  sun.misc.Unsafe.getInt(Ljava/lang/Object;J)I (0 bytes) @
> 0x00011103b1ce [0x00011103b100+0xce]
> j  org.apache.flink.core.memory.MemorySegment.getInt(I)I+33
> j  org.apache.flink.core.memory.MemorySegment.getIntBigEndian(I)I+8
> j
>  
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInt()I+8
> j
>  
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(Lor
>
> g/apache/flink/core/io/IOReadableWritable;)Lorg/apache/flink/runtime/io/network/api/serialization/RecordDeserializer$Deserial
> izationResult;+4
> j
>  
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(Lorg/apach
>
> e/flink/core/io/IOReadableWritable;)Lorg/apache/flink/runtime/io/network/api/serialization/RecordDeserializer$Deserialization
> Result;+12
> j
>  
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(Lorg/apache
>
> /flink/core/io/IOReadableWritable;)Lorg/apache/flink/runtime/io/network/api/serialization/RecordDeserializer$DeserializationR
> esult;+2
> j
>  
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(Lorg/apache/flink/streaming/runtime/io/Pushi
>
> ngAsyncDataInput$DataOutput;)Lorg/apache/flink/streaming/runtime/io/DataInputStatus;+15
> j
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput()Lorg/apache/flink/streaming/runtime/io/DataInp
> utStatus;+8
> j
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(Lorg/apache/flink/streaming/runtime/tasks/mailbox/Mailbox
> DefaultAction$Controller;)V+4
> j
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$615.runDefaultAction(Lorg/apache/flink/streaming/runtime/tasks
> /mailbox/MailboxDefaultAction$Controller;)V+5
> j
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop()V+95
> j  org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop()V+4
> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+30
> j  org.apache.flink.runtime.taskmanager.Task$$Lambda$697.run()V+4
> j
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Lorg/apache/flink/util/function/RunnableWithExceptio
> n;)V+4
> j
>  
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Lorg/apache/flink/runtime/jobgraph/tasks/TaskInvokable;)V+71
> j  org.apache.flink.runtime.taskmanager.Task.doRun()V+728
> j  org.apache.flink.runtime.taskmanager.Task.run()V+1
> j  java.lang.Thread.run()V+11
> v  ~StubRoutines::call_stub
> V  [libjvm.dylib+0x2c3702]
> V  [libjvm.dylib+0x2c25af]
> V  [libjvm.dylib+0x2c279b]
> V  [libjvm.dylib+0x331dd2]
> V  [libjvm.dylib+0x52e033]
> V  [libjvm.dylib+0x52def1]
> V  [libjvm.dylib+0x465c76]
> C  [libsystem_pthread.dylib+0x64f4]  _pthread_start+0x7d
> C  [libsystem_pthread.dylib+0x200f]  thread_start+0xf
> C  0x
>
> ...
>
> ---  S Y S T E M  ---
>
> OS:Bsduname:Darwin 21.2.0 Darwin Kernel 

RE: unaligned checkpoint for job with large start delay

2022-01-11 Thread Schwalbe Matthias
Hi Mason,

Since you are using RocksDB, you could enable this metric [1] 
state-backend-rocksdb-metrics-estimate-num-keys which gives (afaik) good 
indication of the number of active windows.
I’ve never seen (despite the warning) negative effect on the runtime.

Hope this help …

Thias




[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-estimate-num-keys

From: Mason Chen 
Sent: Dienstag, 11. Januar 2022 19:20
To: Piotr Nowojski 
Cc: Mason Chen ; user 
Subject: Re: unaligned checkpoint for job with large start delay

Hi Piotrek,

No worries—I hope you had a good break.

Counting how many windows have been registered/fired and plotting that over 
time.
It’s straightforward to count windows that are fired (the trigger exposes the 
run time context and we can collect the information in that code path). 
However, it’s not so clear how to count the windows that have been registered 
since the window assigner does not expose the run time context—is this even the 
right place to count? It’s not necessarily the case that an assignment results 
in a new window registered. Am I missing anything else relevant from the user 
facing interface perspective?

 Unfortunately at the moment I don't know how to implement such a metric 
without affecting performance on the critical path, so I don't see this 
happening soon :(
Perhaps, it can be an opt in feature? I do it see it being really useful since 
most users aren’t really familiar with windows and these metrics can help 
easily identify the common problem of too many windows firing.

The additional metrics certainly help in diagnosing some of the symptoms of the 
root problem.

Best,
Mason


On Jan 10, 2022, at 1:00 AM, Piotr Nowojski 
mailto:pnowoj...@apache.org>> wrote:

Hi Mason,

Sorry for a late reply, but I was OoO.

I think you could confirm it with more custom metrics. Counting how many 
windows have been registered/fired and plotting that over time.

I think it would be more helpful in this case to check how long a task has been 
blocked being "busy" processing for example timers. FLINK-25414 shows only 
blocked on being hard/soft backpressure. Unfortunately at the moment I don't 
know how to implement such a metric without affecting performance on the 
critical path, so I don't see this happening soon :(

Best,
Piotrek

wt., 4 sty 2022 o 18:02 Mason Chen 
mailto:mason.c...@apple.com>> napisał(a):
Hi Piotrek,


In other words, something (presumably a watermark) has fired more than 151 200 
windows at once, which is taking ~1h 10minutes to process and during this time 
the checkpoint can not make any progress. Is this number of triggered windows 
plausible in your scenario?

It seems plausible—there are potentially many keys (and many windows). Is there 
a way to confirm with metrics? We can add a window fire counter to the window 
operator that only gets incremented at the end of windows evaluation, in order 
to see the huge jumps in window fires. I can this benefiting other users who 
troubleshoot the problem of large number of window firing.

Best,
Mason


On Dec 29, 2021, at 2:56 AM, Piotr Nowojski 
mailto:pnowoj...@apache.org>> wrote:

Hi Mason,

> and it has to finish processing this output before checkpoint can begin—is 
> this right?

Yes. Checkpoint will be only executed once all triggered windows will be fully 
processed.

But from what you have posted it looks like all of that delay is coming from 
hundreds of thousands of windows firing all at the same time. Between 20:30 and 
~21:40 there must have been a bit more than 36 triggers/s * 60s/min * 70min = 
151 200triggers fired at once (or in a very short interval). In other words, 
something (presumably a watermark) has fired more than 151 200 windows at once, 
which is taking ~1h 10minutes to process and during this time the checkpoint 
can not make any progress. Is this number of triggered windows plausible in 
your scenario?

Best,
Piotrek


czw., 23 gru 2021 o 12:12 Mason Chen 
mailto:mason.c...@apple.com>> napisał(a):
Hi Piotr,

Thanks for the thorough response and the PR—will review later.

Clarifications:
1. The flat map you refer to produces at most 1 record.
2. The session window operator’s window process function emits at least 1 
record.
3. The 25 ms sleep is at the beginning of the window process function.

Your explanation about how records being bigger than the buffer size can cause 
blockage makes sense to me. However, my average record size is around 770 bytes 
coming out of the source and 960 bytes coming out of the window. Also, we don’t 
override the default `taskmanager.memory.segment-size`. My Flink job memory 
config is as follows:

```
taskmanager.memory.jvm-metaspace.size: 512 mb
taskmanager.memory.jvm-overhead.max: 2Gb
taskmanager.memory.jvm-overhead.min: 512Mb
taskmanager.memory.managed.fraction: '0.4'
taskmanager.memory.network.fraction: '0.2'
taskmanager.memory.network.max: 2Gb

Flink 1.12 Blink planner timestamp类型转换异常

2022-01-11 Thread 张健
Hi, all




flink1.12 Blink planner有人遇到过这样的问题么:



下面是简化的逻辑
DataStream ds = .map(xxxRichMapFunction);
Table table = tableEnv.fromDataStream(ds);
tableEnv.toAppendStream(table.select(xxx).keyBy(xxx), 
Row.class).addSink(xxxRichSinkFunction);


xxxRichMapFunction中对某个字段写了row.setField(index, java.sql.timestamp.valueOf(str)); 
即写入的值是一个 java.sql.timestamp类型的字段,且xxxRichMapFunction 中getProducedType() 
中对应的位置也是Types.SQL_TIMESTAMP。
但在xxxRichSinkFunction我读取该字段时,却变成了java.time.LocalDateTime类型,导致在JdbcUtils.setRecordToStatement中抛出了java.lang.ClassCastException


我用 Blink planner就会有这个问题,如果程序启动时用useOldPlanner()就不会有这个问题。


张健




 

Re: Serving Machine Learning models

2022-01-11 Thread Xingbo Huang
Hi sonia,

As far as I know, pyflink users prefer to use python udf[1][2] for model
prediction. Load the model when the udf is initialized, and then predict
each new piece of data

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/table/udfs/overview/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/operators/process_function/

Best,
Xingbo

David Anderson  于2022年1月11日周二 03:39写道:

> Another approach that I find quite natural is to use Flink's Stateful
> Functions API [1] for model serving, and this has some nice advantages,
> such as zero-downtime deployments of new models, and the ease with which
> you can use Python. [2] is an example of this approach.
>
> [1] https://flink.apache.org/stateful-functions.html
> [2] https://github.com/ververica/flink-statefun-workshop
>
> On Fri, Jan 7, 2022 at 5:55 PM Yun Gao  wrote:
>
>> Hi Sonia,
>>
>> Sorry I might not have the statistics on the provided two methods,
>> perhaps as input
>> I could also provide another method: currently there is an eco-project
>> dl-on-flink
>> that supports running DL frameworks on top of the Flink and it will
>> handle the data
>> exchange between java and python processes, which would allows to user
>> the native
>> model directly.
>>
>> Best,
>> Yun
>>
>>
>> [1] https://github.com/flink-extended/dl-on-flink
>>
>>
>>
>> --
>> From:Sonia-Florina Horchidan 
>> Send Time:2022 Jan. 7 (Fri.) 17:23
>> To:user@flink.apache.org 
>> Subject:Serving Machine Learning models
>>
>> Hello,
>>
>>
>> I recently started looking into serving Machine Learning models for
>> streaming data in Flink. To give more context, that would involve training
>> a model offline (using PyTorch or TensorFlow), and calling it from inside a
>> Flink job to do online inference on newly arrived data. I have found
>> multiple discussions, presentations, and tools that could achieve this, and
>> it seems like the two alternatives would be: (1) wrap the pre-trained
>> models in a HTTP service (such as PyTorch Serve [1]) and let Flink do async
>> calls for model scoring, or (2) convert the models into a standardized
>> format (e.g., ONNX [2]), pre-load the model in memory for every task
>> manager (or use external storage if needed) and call it for each new data
>> point.
>>
>> Both approaches come with a set of advantages and drawbacks and, as far
>> as I understand, there is no "silver bullet", since one approach could be
>> more suitable than the other based on the application requirements.
>> However, I would be curious to know what would be the "recommended" methods
>> for model serving (if any) and what approaches are currently adopted by the
>> users in the wild.
>>
>> [1] https://pytorch.org/serve/
>>
>> [2] https://onnx.ai/
>>
>> Best regards,
>>
>> Sonia
>>
>>
>>  [image: Kth Logo]
>>
>> Sonia-Florina Horchidan
>> PhD Student
>> KTH Royal Institute of Technology
>> *Software and Computer Systems (SCS)*
>> School of Electrical Engineering and Computer Science (EECS)
>> Mobil: +46769751562
>> sf...@kth.se,  www.kth.se
>>
>>
>>


退订

2022-01-11 Thread qhp...@hotmail.com
退订



qhp...@hotmail.com


Re: sql-gateway和jdbc-driver还维护吗?

2022-01-11 Thread godfrey he
Hi Ada,

sql-gateway之前没有维护起来,确实是一个遗憾。
最近我们也关注到大家对batch的兴趣越来越浓,sql-gateway还会继续维护。

btw,非常欢迎分享一下你们使用Flink替换Spark遇到的一些痛点,我们会逐渐去解决这些痛点

Best,
Godfrey

Ada Wong  于2022年1月12日周三 10:09写道:
>
> cc tsreaper and Godfrey He
>
> 文末丶 <809097...@qq.com.invalid> 于2022年1月10日周一 19:39写道:
>
> >
> > 试下https://github.com/DataLinkDC/dlink 看看能不能满足你的需求
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:
> > "user-zh"   
> >  
> >  > 发送时间:2022年1月10日(星期一) 晚上7:32
> > 收件人:"user-zh" >
> > 主题:Re: sql-gateway和jdbc-driver还维护吗?
> >
> >
> >
> > https://github.com/ververica/flink-jdbc-driver
> > https://github.com/ververica/flink-sql-gateway
> >
> > Ada Wong  > 
> >  我看这俩项目一两年没更新了。想用Flink彻底替换到Spark,这俩项目是刚需,用来替换SparkThriftServer。


JVM SEGV crash in 1.14.2 for scala 2.12

2022-01-11 Thread Eugene Chung
Hi all,

I downloaded 1.14.2 for scala 2.12 and executed a simple
example, ./bin/flink run examples/streaming/WordCount.jar

But in my environment, Mac OS Monterey with OpenJDK 8_312, the TaskManager
JVM crashes with SEGV. Clearly, Unsafe class call in MemorySegment makes it
happen.

How can I resolve this? Please help.

Here's some parts of hs_err.log below;
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00010a1df399, pid=68596, tid=0xc903
#
# JRE version: OpenJDK Runtime Environment (8.0_312) (build
1.8.0_312-bre_2022_01_01_23_04-b00)
# Java VM: OpenJDK 64-Bit Server VM (25.312-b00 mixed mode bsd-amd64
compressed oops)
# Problematic frame:
# V  [libjvm.dylib+0x546399]
#
# Failed to write core dump. Core dumps have been disabled. To enable core
dumping, try "ulimit -c unlimited" before starting
 Java again
#
# If you would like to submit a bug report, please visit:
#   https://github.com/Homebrew/homebrew-core/issues
#

---  T H R E A D  ---

Current thread (0x7fd498d8c000):  JavaThread "Keyed Aggregation ->
Sink: Print to Std. Out (1/1)#0" [_thread_in_vm, id=51
459, stack(0x76a1a000,0x76b1a000)]

siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr:
0x

...

Stack: [0x76a1a000,0x76b1a000],  sp=0x76b19290,
 free space=1020k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native
code)
V  [libjvm.dylib+0x546399]
J 2219  sun.misc.Unsafe.getInt(Ljava/lang/Object;J)I (0 bytes) @
0x00011103b1ce [0x00011103b100+0xce]
j  org.apache.flink.core.memory.MemorySegment.getInt(I)I+33
j  org.apache.flink.core.memory.MemorySegment.getIntBigEndian(I)I+8
j
 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInt()I+8
j
 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(Lor
g/apache/flink/core/io/IOReadableWritable;)Lorg/apache/flink/runtime/io/network/api/serialization/RecordDeserializer$Deserial
izationResult;+4
j
 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(Lorg/apach
e/flink/core/io/IOReadableWritable;)Lorg/apache/flink/runtime/io/network/api/serialization/RecordDeserializer$Deserialization
Result;+12
j
 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(Lorg/apache
/flink/core/io/IOReadableWritable;)Lorg/apache/flink/runtime/io/network/api/serialization/RecordDeserializer$DeserializationR
esult;+2
j
 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(Lorg/apache/flink/streaming/runtime/io/Pushi
ngAsyncDataInput$DataOutput;)Lorg/apache/flink/streaming/runtime/io/DataInputStatus;+15
j
 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput()Lorg/apache/flink/streaming/runtime/io/DataInp
utStatus;+8
j
 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(Lorg/apache/flink/streaming/runtime/tasks/mailbox/Mailbox
DefaultAction$Controller;)V+4
j
 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$615.runDefaultAction(Lorg/apache/flink/streaming/runtime/tasks
/mailbox/MailboxDefaultAction$Controller;)V+5
j
 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop()V+95
j  org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop()V+4
j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+30
j  org.apache.flink.runtime.taskmanager.Task$$Lambda$697.run()V+4
j
 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Lorg/apache/flink/util/function/RunnableWithExceptio
n;)V+4
j
 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Lorg/apache/flink/runtime/jobgraph/tasks/TaskInvokable;)V+71
j  org.apache.flink.runtime.taskmanager.Task.doRun()V+728
j  org.apache.flink.runtime.taskmanager.Task.run()V+1
j  java.lang.Thread.run()V+11
v  ~StubRoutines::call_stub
V  [libjvm.dylib+0x2c3702]
V  [libjvm.dylib+0x2c25af]
V  [libjvm.dylib+0x2c279b]
V  [libjvm.dylib+0x331dd2]
V  [libjvm.dylib+0x52e033]
V  [libjvm.dylib+0x52def1]
V  [libjvm.dylib+0x465c76]
C  [libsystem_pthread.dylib+0x64f4]  _pthread_start+0x7d
C  [libsystem_pthread.dylib+0x200f]  thread_start+0xf
C  0x

...

---  S Y S T E M  ---

OS:Bsduname:Darwin 21.2.0 Darwin Kernel Version 21.2.0: Sun Nov 28 20:28:54
PST 2021; root:xnu-8019.61.5~1/RELEASE_X86_64 x86
_64
rlimit: STACK 8192k, CORE 0k, NPROC 2784, NOFILE 10240, AS infinity
load average:1.78 1.99 2.20

CPU:total 12 (initial active 12) (6 cores per cpu, 2 threads per core)
family 6 model 158 stepping 10, cmov, cx8, fxsr, mmx,
sse, sse2, sse3, ssse3, sse4.1, sse4.2, popcnt, avx, avx2, aes, clmul,
erms, 3dnowpref, lzcnt, ht, tsc, tscinvbit, bmi1, bmi2
, adx

Memory: 4k page, physical 16777216k(266056k free)

/proc/meminfo:


vm_info: OpenJDK 

Re: sql-gateway和jdbc-driver还维护吗?

2022-01-11 Thread Ada Wong
cc tsreaper and Godfrey He

文末丶 <809097...@qq.com.invalid> 于2022年1月10日周一 19:39写道:

>
> 试下https://github.com/DataLinkDC/dlink 看看能不能满足你的需求
>
>
>
>
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2022年1月10日(星期一) 晚上7:32
> 收件人:"user-zh"
> 主题:Re: sql-gateway和jdbc-driver还维护吗?
>
>
>
> https://github.com/ververica/flink-jdbc-driver
> https://github.com/ververica/flink-sql-gateway
>
> Ada Wong  
>  我看这俩项目一两年没更新了。想用Flink彻底替换到Spark,这俩项目是刚需,用来替换SparkThriftServer。


Flink 1.12 Blink planner timestamp类型转换异常

2022-01-11 Thread 张健
Hi, all




flink1.12 Blink planner有人遇到过这样的问题么:



下面是简化的逻辑
DataStream ds = .map(xxxRichMapFunction);
Table table = tableEnv.fromDataStream(ds);
tableEnv.toAppendStream(table.select(xxx).keyBy(xxx), 
Row.class).addSink(xxxRichSinkFunction);


xxxRichMapFunction中对某个字段写了row.setField(index, java.sql.timestamp.valueOf(str)); 
即写入的值是一个 java.sql.timestamp类型的字段,且xxxRichMapFunction 中getProducedType() 
中对应的位置也是Types.SQL_TIMESTAMP。
但在xxxRichSinkFunction我读取该字段时,却变成了java.time.LocalDateTime类型,导致在JdbcUtils.setRecordToStatement中抛出了java.lang.ClassCastException


我用 Blink planner就会有这个问题,如果程序启动时用useOldPlanner()就不会有这个问题。


张健

Is FlinkKafkaProducer state compatible with KafkaSink sink? How to migrate?

2022-01-11 Thread Kevin Lam
Hi all,

We're looking to migrating from FlinkKafkaProducer to the new KafkaSink for
the new unified Sink API.

Is the state compatible across the two Kafka sink APIs? If not, what's the
best way to migrate from one to the other?

Thanks in advance,
Kevin


Async IO code not working

2022-01-11 Thread Siddhesh Kalgaonkar
I am using below code to get the data from the side output which has
filtered records.
So, it goes like this:

val filterRecords: DataStream[String] = src.process(new
ProcessFunction()).getSideOutput(filteredOutputTag)

It has filtered records in it.

Now, I want to add these records to the db asynchronously. Therefore, I
wrote below code using documentation reference:

val asyncFunction:AsyncFunction[String,String]=new DBAsyncSink() //SO
reference
AsyncDataStream.unorderedWait(goodRecords,new DBAsyncSink(), 1000,
TimeUnit.SECONDS, 100) //Documentation Reference

and the class for the "DBAsyncSink" is as follows:

class DBAsyncSink extends RichAsyncFunction[String,String] {

  override def open(parameters: Configuration): Unit = {

  }

  override def asyncInvoke(input:String, resultFuture:
ResultFuture[String]): Unit = {

  }

  override def close(): Unit = {
session.close()
  }

}

I am getting below error:

type mismatch;
 found   : org.apache.flink.streaming.api.scala.DataStream[String]
 required: org.apache.flink.streaming.api.datastream.DataStream[?]

What am I missing over here? I tried a couple of examples but it didn't
work.

Thanks,
Sid


Re: unaligned checkpoint for job with large start delay

2022-01-11 Thread Mason Chen
Hi Piotrek,

No worries—I hope you had a good break.

> Counting how many windows have been registered/fired and plotting that over 
> time.
It’s straightforward to count windows that are fired (the trigger exposes the 
run time context and we can collect the information in that code path). 
However, it’s not so clear how to count the windows that have been registered 
since the window assigner does not expose the run time context—is this even the 
right place to count? It’s not necessarily the case that an assignment results 
in a new window registered. Am I missing anything else relevant from the user 
facing interface perspective?

>  Unfortunately at the moment I don't know how to implement such a metric 
> without affecting performance on the critical path, so I don't see this 
> happening soon :(
Perhaps, it can be an opt in feature? I do it see it being really useful since 
most users aren’t really familiar with windows and these metrics can help 
easily identify the common problem of too many windows firing.

The additional metrics certainly help in diagnosing some of the symptoms of the 
root problem.

Best,
Mason

> On Jan 10, 2022, at 1:00 AM, Piotr Nowojski  wrote:
> 
> Hi Mason,
> 
> Sorry for a late reply, but I was OoO.
> 
> I think you could confirm it with more custom metrics. Counting how many 
> windows have been registered/fired and plotting that over time.
> 
> I think it would be more helpful in this case to check how long a task has 
> been blocked being "busy" processing for example timers. FLINK-25414 shows 
> only blocked on being hard/soft backpressure. Unfortunately at the moment I 
> don't know how to implement such a metric without affecting performance on 
> the critical path, so I don't see this happening soon :(
> 
> Best,
> Piotrek
> 
> wt., 4 sty 2022 o 18:02 Mason Chen  > napisał(a):
> Hi Piotrek,
> 
>> In other words, something (presumably a watermark) has fired more than 151 
>> 200 windows at once, which is taking ~1h 10minutes to process and during 
>> this time the checkpoint can not make any progress. Is this number of 
>> triggered windows plausible in your scenario?
> 
> It seems plausible—there are potentially many keys (and many windows). Is 
> there a way to confirm with metrics? We can add a window fire counter to the 
> window operator that only gets incremented at the end of windows evaluation, 
> in order to see the huge jumps in window fires. I can this benefiting other 
> users who troubleshoot the problem of large number of window firing.
> 
> Best,
> Mason
> 
>> On Dec 29, 2021, at 2:56 AM, Piotr Nowojski > > wrote:
>> 
>> Hi Mason,
>> 
>> > and it has to finish processing this output before checkpoint can begin—is 
>> > this right?
>> 
>> Yes. Checkpoint will be only executed once all triggered windows will be 
>> fully processed. 
>> 
>> But from what you have posted it looks like all of that delay is coming from 
>> hundreds of thousands of windows firing all at the same time. Between 20:30 
>> and ~21:40 there must have been a bit more than 36 triggers/s * 60s/min * 
>> 70min = 151 200triggers fired at once (or in a very short interval). In 
>> other words, something (presumably a watermark) has fired more than 151 200 
>> windows at once, which is taking ~1h 10minutes to process and during this 
>> time the checkpoint can not make any progress. Is this number of triggered 
>> windows plausible in your scenario?
>> 
>> Best,
>> Piotrek
>> 
>> 
>> czw., 23 gru 2021 o 12:12 Mason Chen > > napisał(a):
>> Hi Piotr,
>> 
>> Thanks for the thorough response and the PR—will review later.
>> 
>> Clarifications:
>> 1. The flat map you refer to produces at most 1 record.
>> 2. The session window operator’s window process function emits at least 1 
>> record. 
>> 3. The 25 ms sleep is at the beginning of the window process function.
>> 
>> Your explanation about how records being bigger than the buffer size can 
>> cause blockage makes sense to me. However, my average record size is around 
>> 770 bytes coming out of the source and 960 bytes coming out of the window. 
>> Also, we don’t override the default `taskmanager.memory.segment-size`. My 
>> Flink job memory config is as follows:
>> 
>> ```
>> taskmanager.memory.jvm-metaspace.size: 512 mb
>> taskmanager.memory.jvm-overhead.max: 2Gb
>> taskmanager.memory.jvm-overhead.min: 512Mb
>> taskmanager.memory.managed.fraction: '0.4'
>> taskmanager.memory.network.fraction: '0.2'
>> taskmanager.memory.network.max: 2Gb
>> taskmanager.memory.network.min: 200Mb
>> taskmanager.memory.process.size: 16Gb
>> taskmanager.numberOfTaskSlots: '4'
>> ```
>> 
>>>  Are you sure your job is making any progress? Are records being processed? 
>>> Hasn't your job simply deadlocked on something?
>> 
>> To distinguish task blockage vs graceful backpressure, I have checked the 
>> 

Re: Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-11 Thread Kevin Lam
Hi Fabian,

No problem, thanks for the clarification. In terms of its importance, we
have some Flink applications running using
StreamExecutionEnvironment.readFiles
,
so in order to adopt the new FileSource API, we would need to migrate those
applications. Ideally, we could migrate the state. If there isn't a way to
migrate state, it would be nice if there were some documentation or
guidance from the Flink community on how best to migrate.

Cheers,
Kevin

On Tue, Jan 11, 2022 at 10:19 AM Fabian Paul  wrote:

> Hi Kevin,
>
> Sorry for the misleading information. The FileSink is compatible with
> the predecessor but unfortunately, it is not the case for the
> FileSource. I updated the ticket accordingly. Perhaps there is a way
> to migrate the state but it would be a larger effort. Is this an
> important feature for you?
>
> Best,
> Fabian
>
> On Mon, Jan 10, 2022 at 3:58 PM Kevin Lam  wrote:
> >
> > Hi Fabian,
> >
> > Thanks for creating and sharing that ticket. I noticed the clause "The
> FileSource can already read the state of the previous version", a little
> off-topic from the original topic of this thread but I was wondering if you
> could elaborate on that. Can the new FileSource interoperate with the old
> .readFile operator state? Is there a smooth way to upgrade to the new
> FileSource API from the old one without losing state?
> >
> > Thanks!
> >
> > On Mon, Jan 10, 2022 at 7:20 AM Fabian Paul  wrote:
> >>
> >> Hi Kevin,
> >>
> >> I created a ticket to track the effort [1]. Unfortunately, we are
> >> already in the last few weeks of the release cycle for 1.15 so I
> >> cannot guarantee that someone can implement it until then.
> >>
> >> Best,
> >> Fabian
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-25591
> >>
> >> On Fri, Jan 7, 2022 at 5:07 PM Kevin Lam  wrote:
> >> >
> >> > Hi all,
> >> >
> >> > Are there any plans to update StreamExecutionEnvironment.readFiles to
> use the new FLIP-27 compatible FileSource?
> >> >
> >> > readFiles supports some features via it's FileInputFormat like
> setNestedFileEnumeration and setFilesFilter that we'd be interested in
> continuing to use but it seems FileSource doesn't have that.
>


Re: Sorting/grouping keys and State management in BATCH mode

2022-01-11 Thread Chesnay Schepler

Looping in Dawid who can hopefully answer your questions.

On 11/01/2022 13:00, Krzysztof Chmielewski wrote:

Hi,
Im reading docs and FLIP-140 available for BATCH mode [1][2] where it 
reads that
" In |BATCH| mode, the configured state backend is ignored. Instead, 
the input of a keyed operation is grouped by key (using sorting) and 
then we process all records of a key in turn."  [1]


I would like to ask:
1. Where (Heap, OffHeap) Flink keeps records for BATCH Streams if the 
configured  state backed  is ignored. In FLIP-140 i see there was a 
new State implementation created, that is prepared to keep only one 
key value, but there is no information "where" regarding memory it is 
kept.


2. Where Sorting algorithm keeps it intermediate results?
How/Who knows that there will be no more records for given key?

If I get it right, sorting is done through ExternalSorter class. Is 
there any documentation or usage example for ExternalSorter and 
description about SortStege like READ, SORT, SPILL?


Regards,
Krzysztof Chmielewski


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Howtosort/groupkeys




Re: StaticFileSplitEnumerator - keeping track 9f already processed files

2022-01-11 Thread Fabian Paul
Hi Krzysztof,

Thanks for your investigation. Can you maybe share the code with us?
collectWithClient will insert a custom sink into the datastream that
buffers all incoming records and make them queryable. It is already
deprecated and one should use executeAndCollect that fulfills the same
purpose.

There is a difference between the execution modes streaming and batch,
and boundedness of a source. It is possible to execute a bounded
source in streaming mode and therefore have checkpoints.

For your experiments did you only change the boundedness or also the
runtime mode? [1]

Best,
Fabian

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming

On Tue, Jan 11, 2022 at 12:04 PM Krzysztof Chmielewski
 wrote:
>
> Hi Fabian,
> Thank you for your input and I'm sorry for delay on my part.
>
> Before I will create a ticket I would like to ask about one thing more.
> There is test 
> FileSourceTextLinesITCase::testBoundedTextFileSourceWithJobManagerFailover
> This test uses DataStreamUtils.collectWithClient(...) which returns an 
> iterator that we later to get the processing results.
>
> I did a quick PoC where I created my own FileSource that uses 
> alreadyProcessedFiles Set in Bounded mode, it is based on FileSource 
> implementation.
> I noticed some issues with this test, when i use it for my Bounded Split 
> Enumerator that keeps track of already processed files. For example
>
> Case 1:
> a) set execution mode to Streaming
> b) set checkpoint Interval to 10 milis
> Result : test fails because result has fever records that it is expected, 
> actually it reports zero records in the result.
>
> Case 2:
> a) set execution mode to Streaming
> b) disable checkpoint
> Result: test passes
>
> Case 3:
> a) set execution mode to Bounded
> b) disable checkpoint
> Result Test Passes
>
> Case 3:
> a) set execution mode to Bounded
> b) enable checkpoint
> Result Test Passes (since checkpoints are ignored in BATCH mode)
>
> I looked at testBoundedTextFileSource and testContinuousTextFileSource 
> methods and I understand idea how the Cluster failover is trigger and whatnot.
> Although I do see that gathering the final results for Continuous mode is 
> slightly different.
> Could you shed some light on this and how the collectWithClient works 
> especially in case if Failover.
>
> Thanks,
> Krzysztof Chmielewski
>
> czw., 6 sty 2022 o 09:29 Fabian Paul  napisał(a):
>>
>> Hi,
>>
>> I think your analysis is correct. One thing to note here is that I
>> guess when implementing the StaticFileSplitEnumerator we only thought
>> about the batch case where no checkpoints exist [1] on the other hand
>> it is possible as you have noted to run a bounded source in streaming
>> mode.
>>
>> Although in the current implementation we already checkpoint the
>> remaining splits of the StaticFileSplitEnumerator so it should be easy
>> to also pass the alreadyDiscoveredPaths to the
>> StaticFileSplitEnumerator.
>>
>> @Krzysztof Chmielewski can you create a ticket for that?
>>
>> Best,
>> Fabian
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming
>>
>> On Thu, Jan 6, 2022 at 9:13 AM Krzysztof Chmielewski
>>  wrote:
>> >
>> > Hi,
>> > Yes I know that ContinuousFileSplitEnumerator has continuously scan the 
>> > monitored folder for the new files and StaticFileSplitEnumerator does not, 
>> > this is clear.
>> >
>> > However I was asking about a different scenario, the scenario when we are 
>> > restoring from a checkpoint.
>> > FileSource can process many files, not only one. The underlying API uses 
>> > array of paths not just single Path.
>> >
>> > If I understand correctly, when we are recovering from a checkpoint, for 
>> > example due to Job Manager issue, FileEnumerator will create an Array of 
>> > Splits and pass it to StaticFileSplitEnumerator.
>> >
>> > Same goes for ContinuousFileSplitEnumerator. However  when 
>> > ContinuousFileSplitEnumerator is started, it iterates through Path[] array 
>> > and checks which files were already processed and skip them using 
>> > pathsAlreadyProcessed set hence not creating Splits for those files.
>> >
>> > However it seems that StaticFileSplitEnumerator will reprocess files that 
>> > were already used for Split creation. In case of Checkpoint restoration it 
>> > does not check if that file was already processed.
>> >
>> > Regards,
>> > Krzysztof Chmielewski
>> >
>> >
>> >
>> >
>> > czw., 6 sty 2022, 03:21 użytkownik Caizhi Weng  
>> > napisał:
>> >>
>> >> Hi!
>> >>
>> >> Do you mean the pathsAlreadyProcessed set in 
>> >> ContinuousFileSplitEnumerator?
>> >>
>> >> This is because ContinuousFileSplitEnumerator has to continuously add new 
>> >> files to splitAssigner, while StaticFileSplitEnumerator does not. The 
>> >> pathsAlreadyProcessed set records the paths already discovered by 
>> >> ContinuousFileSplitEnumerator so that it will not add 

Re: Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-11 Thread Fabian Paul
Hi Kevin,

Sorry for the misleading information. The FileSink is compatible with
the predecessor but unfortunately, it is not the case for the
FileSource. I updated the ticket accordingly. Perhaps there is a way
to migrate the state but it would be a larger effort. Is this an
important feature for you?

Best,
Fabian

On Mon, Jan 10, 2022 at 3:58 PM Kevin Lam  wrote:
>
> Hi Fabian,
>
> Thanks for creating and sharing that ticket. I noticed the clause "The 
> FileSource can already read the state of the previous version", a little 
> off-topic from the original topic of this thread but I was wondering if you 
> could elaborate on that. Can the new FileSource interoperate with the old 
> .readFile operator state? Is there a smooth way to upgrade to the new 
> FileSource API from the old one without losing state?
>
> Thanks!
>
> On Mon, Jan 10, 2022 at 7:20 AM Fabian Paul  wrote:
>>
>> Hi Kevin,
>>
>> I created a ticket to track the effort [1]. Unfortunately, we are
>> already in the last few weeks of the release cycle for 1.15 so I
>> cannot guarantee that someone can implement it until then.
>>
>> Best,
>> Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-25591
>>
>> On Fri, Jan 7, 2022 at 5:07 PM Kevin Lam  wrote:
>> >
>> > Hi all,
>> >
>> > Are there any plans to update StreamExecutionEnvironment.readFiles to use 
>> > the new FLIP-27 compatible FileSource?
>> >
>> > readFiles supports some features via it's FileInputFormat like 
>> > setNestedFileEnumeration and setFilesFilter that we'd be interested in 
>> > continuing to use but it seems FileSource doesn't have that.


Re: [E] Re: Orphaned job files in HDFS

2022-01-11 Thread David Clutter
Ok, that makes sense.  I did see some job failures.  However failures could
happen occasionally.  Is there any option to have the job manager clean-up
these directories when the job has failed?

On Mon, Jan 10, 2022 at 8:58 PM Yang Wang  wrote:

> IIRC, the staging directory(/user/{name}/.flink/application_xxx) will be
> deleted automatically if the Flink job reaches global terminal state(e.g.
> FINISHED, CANCELED, FAILED).
> So I assume you have stopped the yarn application via "yarn application
> -kill", not via "bin/flink cancel".
> If it is the case, then having the residual staging directory is an
> expected behavior since Flink JobManager does not have a chance to do the
> clean-up.
>
>
>
> Best,
> Yang
>
> David Clutter  于2022年1月11日周二 10:08写道:
>
>> I'm seeing files orphaned in HDFS and wondering how to clean them up when
>> the job is completed.  The directory is /user/yarn/.flink so I am assuming
>> this is created by flink?  The HDFS in my cluster eventually fills up.
>>
>> Here is my setup:
>>
>>- Flink 1.13.1 on AWS EMR
>>- Executing flink in per-job mode
>>- Job is submitted every 5m
>>
>> In HDFS under /user/yarn/.flink I see a directory created for every flink
>> job submitted/yarn application.  Each application directory contains my
>> user jar file, flink-dist jar, /lib with various flink jars,
>> log4j.properties.
>>
>> Is there a property to tell flink to clean up this directory when the job
>> is completed?
>>
>


Re: Could not find any factory for identifier 'jdbc'

2022-01-11 Thread Chesnay Schepler
How do you ensure that the connector is actually available at runtime? 
Are you bundling it in a jar or putting it into Flinks lib directory?


On 11/01/2022 14:14, Ronak Beejawat (rbeejawa) wrote:

Correcting subject -> Could not find any factory for identifier 'jdbc'

From: Ronak Beejawat (rbeejawa)
Sent: Tuesday, January 11, 2022 6:43 PM
To: 'd...@flink.apache.org' ; 'commun...@flink.apache.org' 
; 'user@flink.apache.org' 
Cc: 'Hang Ruan' ; Shrinath Shenoy K (sshenoyk) ; Karthikeyan 
Muthusamy (karmuthu) ; Krishna Singitam (ksingita) ; Arun Yadav 
(aruny) ; Jayaprakash Kuravatti (jkuravat) ; Avi Sanwal (asanwal) 

Subject: what is efficient way to write Left join in flink

Hi Team,

Getting below exception while using jdbc connector :

Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'jdbc' that implements 
'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
kafka
print
upsert-kafka


I have already added dependency for jdbc connector in pom.xml as mentioned 
below:


org.apache.flink
flink-connector-jdbc_2.11
1.14.2


mysql
mysql-connector-java
5.1.41


Referred release doc link for the same 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/



Please help me on this and provide the solution for it !!!


Thanks
Ronak Beejawat





Re: Cannot load user class: avro GenericRecord

2022-01-11 Thread Jason Politis
I realize those 2 specific ones are commented out, but I believe they are
only used in flink-sql, and there's currently an issue when I include them
in the pom.  I might not have the repos correct.



Thank you


Jason Politis
Solutions Architect, Carrera Group
carrera.io
| jpoli...@carrera.io 



On Tue, Jan 11, 2022 at 8:13 AM Jason Politis  wrote:

> http://maven.apache.org/POM/4.0.0; xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance;
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
> 4.0.0
>
> boars_head
> w_build_d
> 0.1
> jar
>
> Cql Flink DataStream Java
> https://flink.apache.org
>
> 
> UTF-8
> 1.14.0
> 1.8
> 2.11
> ${target.java.version}
> ${target.java.version}
> 2.14.1
> 2.6.2
> 
>
> 
> 
> apache.snapshots
> Apache Development Snapshot Repository
> https://repository.apache.org/content/repositories/snapshots/
> 
> false
> 
> 
> true
> 
> 
> 
> central
> Central Repository
> https://repo1.maven.org/maven2/
> 
> false
> 
> 
> true
> 
> 
> 
>
> 
> 
> org.apache.flink
> flink-walkthrough-common_${scala.binary.version}
> ${flink.version}
> 
>
> 
> 
> org.apache.flink
> flink-streaming-java_${scala.binary.version}
> ${flink.version}
> provided
> 
> 
> org.apache.flink
> flink-table-api-java-bridge_${scala.binary.version} artifactId>
> ${flink.version}
> provided
> 
> 
> org.apache.flink
> flink-clients_${scala.binary.version}
> ${flink.version}
> provided
> 
> 
> org.apache.flink
> flink-table-planner_${scala.binary.version}
> ${flink.version}
> provided
> 
> 
> org.apache.flink
> flink-streaming-scala_${scala.binary.version}
> ${flink.version}
> provided
> 
>
> 
> 
> org.apache.flink
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.flink
> flink-avro-confluent-registry
> ${flink.version}
> 
> 
> 
> com.github.housepower
> clickhouse-native-jdbc-shaded
> ${clickhouse-native-jdbc.version}
> 
> 
> org.apache.flink
> flink-connector-jdbc_${scala.binary.version}
> ${flink.version}
> 
> 
> 
> 
> org.apache.logging.log4j
> log4j-slf4j-impl
> ${log4j.version}
> runtime
> 
> 
> org.apache.logging.log4j
> log4j-api
> ${log4j.version}
> runtime
> 
> 
> org.apache.logging.log4j
> log4j-core
> ${log4j.version}
> runtime
> 
> 
>
> 
> 
>
> 
> 
> org.apache.maven.plugins
> maven-compiler-plugin
> 3.1
> 
> ${target.java.version}
> ${target.java.version}
> 
> 
>
> 
> 
> 
> org.apache.maven.plugins
> maven-shade-plugin
> 3.0.0
> 
> 
> 
> package
> 
> shade
> 
> 
> 
> 
> org.apache.flink:flink-shaded-force-shading
> com.google.code.findbugs:jsr305
> org.slf4j:*
> org.apache.logging.log4j:*
> 
> 
> 
> 
> 
> *:*
> 
> META-INF/*.SF
> META-INF/*.DSA
> META-INF/*.RSA
> 
> 
> 
> 
>  "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> carrera.build.BuildFlinkJob
> 
> 
> 
> 
> 
> 
> 
>
> 
> 
>
> 
> 
> org.eclipse.m2e
> lifecycle-mapping
> 1.0.0
> 
> 
> 
> 
> 
> org.apache.maven.plugins
> maven-shade-plugin
> [3.0.0,)
> 
> shade
> 
> 
> 
> 
> 
> 
> 
> 
> org.apache.maven.plugins
> maven-compiler-plugin
> [3.1,)
> 
> testCompile
> compile
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>
>
> Above is the POM.
>
> Here is the entry in our docker-compose where we drop the jars into the
> opt/flink/lib folder:
>
> volumes:
> -
> './connectors/flink-sql-avro-confluent-registry-1.14.0.jar:/opt/flink/lib/flink-sql-avro-confluent-registry-1.14.0.jar'
> -
> './connectors/flink-sql-connector-kafka_2.11-1.14.0.jar:/opt/flink/lib/flink-sql-connector-kafka_2.11-1.14.0.jar'
>
> Thank you
>
>
> Jason Politis
> Solutions Architect, Carrera Group
> carrera.io
> | jpoli...@carrera.io 
> 
>
>
> On Mon, Jan 10, 2022 at 9:31 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> Could you share your pom.xml file of your user project? Did you include
>> the flink-avro dependency? Also did you add the avro format jar to the lib
>> directory of your Flink distribution?
>>
>> Jason Politis  于2022年1月11日周二 08:42写道:
>>
>>> Good evening all,
>>>
>>> I'm working on a project for a client.  We are trying to execute Flink
>>> SQL using Table API in java.
>>> We are going to pull their data from oracle -> debezium -> kafka ->
>>> flink.
>>>
>>>
>>> Here is a sample of our java code:
>>>
>>> package carrera.build;
>>>
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.TableEnvironment;
>>>
>>> public class BuildFlinkJob {
>>> public static void main(String[] args) throws Exception {
>>> EnvironmentSettings settings = 
>>> EnvironmentSettings.inStreamingMode();
>>> TableEnvironment tEnv = TableEnvironment.create(settings);
>>>
>>> tEnv.executeSql(
>>> "CREATE TABLE BUILDS (\n" +
>>> "`PARTITION` INT METADATA FROM 'partition',\n" +
>>> "`OFFSET` BIGINT METADATA FROM 'offset',\n" +
>>> "BUILD_ID 

Re: Cannot load user class: avro GenericRecord

2022-01-11 Thread Jason Politis
http://maven.apache.org/POM/4.0.0; xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance;
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
4.0.0

boars_head
w_build_d
0.1
jar

Cql Flink DataStream Java
https://flink.apache.org


UTF-8
1.14.0
1.8
2.11
${target.java.version}
${target.java.version}
2.14.1
2.6.2




apache.snapshots
Apache Development Snapshot Repository
https://repository.apache.org/content/repositories/snapshots/

false


true



central
Central Repository
https://repo1.maven.org/maven2/

false


true






org.apache.flink
flink-walkthrough-common_${scala.binary.version}
${flink.version}




org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
provided


org.apache.flink
flink-table-api-java-bridge_${scala.binary.version}
${flink.version}
provided


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
provided


org.apache.flink
flink-table-planner_${scala.binary.version}
${flink.version}
provided


org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
provided




org.apache.flink
flink-connector-kafka_${scala.binary.version}
${flink.version}


org.apache.flink
flink-avro-confluent-registry
${flink.version}



com.github.housepower
clickhouse-native-jdbc-shaded
${clickhouse-native-jdbc.version}


org.apache.flink
flink-connector-jdbc_${scala.binary.version}
${flink.version}




org.apache.logging.log4j
log4j-slf4j-impl
${log4j.version}
runtime


org.apache.logging.log4j
log4j-api
${log4j.version}
runtime


org.apache.logging.log4j
log4j-core
${log4j.version}
runtime








org.apache.maven.plugins
maven-compiler-plugin
3.1

${target.java.version}
${target.java.version}






org.apache.maven.plugins
maven-shade-plugin
3.0.0



package

shade




org.apache.flink:flink-shaded-force-shading
com.google.code.findbugs:jsr305
org.slf4j:*
org.apache.logging.log4j:*





*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA





carrera.build.BuildFlinkJob













org.eclipse.m2e
lifecycle-mapping
1.0.0





org.apache.maven.plugins
maven-shade-plugin
[3.0.0,)

shade








org.apache.maven.plugins
maven-compiler-plugin
[3.1,)

testCompile
compile
















Above is the POM.

Here is the entry in our docker-compose where we drop the jars into the
opt/flink/lib folder:

volumes:
-
'./connectors/flink-sql-avro-confluent-registry-1.14.0.jar:/opt/flink/lib/flink-sql-avro-confluent-registry-1.14.0.jar'
-
'./connectors/flink-sql-connector-kafka_2.11-1.14.0.jar:/opt/flink/lib/flink-sql-connector-kafka_2.11-1.14.0.jar'

Thank you


Jason Politis
Solutions Architect, Carrera Group
carrera.io
| jpoli...@carrera.io 



On Mon, Jan 10, 2022 at 9:31 PM Caizhi Weng  wrote:

> Hi!
>
> Could you share your pom.xml file of your user project? Did you include
> the flink-avro dependency? Also did you add the avro format jar to the lib
> directory of your Flink distribution?
>
> Jason Politis  于2022年1月11日周二 08:42写道:
>
>> Good evening all,
>>
>> I'm working on a project for a client.  We are trying to execute Flink
>> SQL using Table API in java.
>> We are going to pull their data from oracle -> debezium -> kafka -> flink.
>>
>>
>> Here is a sample of our java code:
>>
>> package carrera.build;
>>
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.TableEnvironment;
>>
>> public class BuildFlinkJob {
>> public static void main(String[] args) throws Exception {
>> EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
>> TableEnvironment tEnv = TableEnvironment.create(settings);
>>
>> tEnv.executeSql(
>> "CREATE TABLE BUILDS (\n" +
>> "`PARTITION` INT METADATA FROM 'partition',\n" +
>> "`OFFSET` BIGINT METADATA FROM 'offset',\n" +
>> "BUILD_ID DOUBLE,\n" +
>> "BUILD_NAME STRING,\n" +
>> "FACILITY_NUMBER STRING,\n" +
>> "START_DATE TIMESTAMP(2),\n" +
>> "END_DATE TIMESTAMP(2),\n" +
>> "RETAILERDIVISION_NAME STRING,\n" +
>> "UPC STRING,\n" +
>> "BUILD_INSTRUCTIONS STRING,\n" +
>> "WORK_INSTRUCTIONS STRING,\n" +
>> "IMAGE_FILE_PATH STRING\n" +
>> ") WITH (\n" +
>> "'connector' = 'kafka',\n" +
>> "'topic' = 
>> 'clients-name.OBIANEW_SDS_EBS_12_1_3.BUILDS',\n" +
>> "'properties.bootstrap.servers' = 'broker:29092',\n" +
>> "'properties.group.id' = 'builds',\n" +
>> "'format' = 'debezium-avro-confluent',\n" +
>> "'debezium-avro-confluent.url' = 
>> 'http://schema-registry:8081',\n" +
>> "'scan.startup.mode' = 

Sorting/grouping keys and State management in BATCH mode

2022-01-11 Thread Krzysztof Chmielewski
Hi,
Im reading docs and FLIP-140 available for BATCH mode [1][2] where it reads
that
" In BATCH mode, the configured state backend is ignored. Instead, the
input of a keyed operation is grouped by key (using sorting) and then we
process all records of a key in turn."  [1]

I would like to ask:
1. Where (Heap, OffHeap) Flink keeps records for BATCH Streams if the
configured  state backed  is ignored. In FLIP-140 i see there was a new
State implementation created, that is prepared to keep only one key value,
but there is no information "where" regarding memory it is kept.

2. Where Sorting algorithm keeps it intermediate results?
How/Who knows that there will be no more records for given key?

If I get it right, sorting is done through ExternalSorter class. Is there
any documentation or usage example for ExternalSorter and description about
SortStege like READ, SORT, SPILL?

Regards,
Krzysztof Chmielewski


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Howtosort/groupkeys


(无主题)

2022-01-11 Thread 生如夏花
退订

Re: StaticFileSplitEnumerator - keeping track 9f already processed files

2022-01-11 Thread Krzysztof Chmielewski
Hi Fabian,
Thank you for your input and I'm sorry for delay on my part.

Before I will create a ticket I would like to ask about one thing more.
There is
test FileSourceTextLinesITCase::testBoundedTextFileSourceWithJobManagerFailover
This test uses DataStreamUtils.collectWithClient(...) which returns an
iterator that we later to get the processing results.

I did a quick PoC where I created my own FileSource that uses
alreadyProcessedFiles Set in Bounded mode, it is based on FileSource
implementation.
I noticed some issues with this test, when i use it for my Bounded Split
Enumerator that keeps track of already processed files. For example

Case 1:
a) set execution mode to Streaming
b) set checkpoint Interval to 10 milis
Result : test fails because result has fever records that it is expected,
actually it reports zero records in the result.

Case 2:
a) set execution mode to Streaming
b) disable checkpoint
Result: test passes

Case 3:
a) set execution mode to Bounded
b) disable checkpoint
Result Test Passes

Case 3:
a) set execution mode to Bounded
b) enable checkpoint
Result Test Passes (since checkpoints are ignored in BATCH mode)

I looked at testBoundedTextFileSource and testContinuousTextFileSource
methods and I understand idea how the Cluster failover is trigger and
whatnot.
Although I do see that gathering the final results for Continuous mode is
slightly different.
Could you shed some light on this and how the collectWithClient works
especially in case if Failover.

Thanks,
Krzysztof Chmielewski

czw., 6 sty 2022 o 09:29 Fabian Paul  napisał(a):

> Hi,
>
> I think your analysis is correct. One thing to note here is that I
> guess when implementing the StaticFileSplitEnumerator we only thought
> about the batch case where no checkpoints exist [1] on the other hand
> it is possible as you have noted to run a bounded source in streaming
> mode.
>
> Although in the current implementation we already checkpoint the
> remaining splits of the StaticFileSplitEnumerator so it should be easy
> to also pass the alreadyDiscoveredPaths to the
> StaticFileSplitEnumerator.
>
> @Krzysztof Chmielewski can you create a ticket for that?
>
> Best,
> Fabian
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming
>
> On Thu, Jan 6, 2022 at 9:13 AM Krzysztof Chmielewski
>  wrote:
> >
> > Hi,
> > Yes I know that ContinuousFileSplitEnumerator has continuously scan the
> monitored folder for the new files and StaticFileSplitEnumerator does not,
> this is clear.
> >
> > However I was asking about a different scenario, the scenario when we
> are restoring from a checkpoint.
> > FileSource can process many files, not only one. The underlying API uses
> array of paths not just single Path.
> >
> > If I understand correctly, when we are recovering from a checkpoint, for
> example due to Job Manager issue, FileEnumerator will create an Array of
> Splits and pass it to StaticFileSplitEnumerator.
> >
> > Same goes for ContinuousFileSplitEnumerator. However  when
> ContinuousFileSplitEnumerator is started, it iterates through Path[] array
> and checks which files were already processed and skip them using
> pathsAlreadyProcessed set hence not creating Splits for those files.
> >
> > However it seems that StaticFileSplitEnumerator will reprocess files
> that were already used for Split creation. In case of Checkpoint
> restoration it does not check if that file was already processed.
> >
> > Regards,
> > Krzysztof Chmielewski
> >
> >
> >
> >
> > czw., 6 sty 2022, 03:21 użytkownik Caizhi Weng 
> napisał:
> >>
> >> Hi!
> >>
> >> Do you mean the pathsAlreadyProcessed set in
> ContinuousFileSplitEnumerator?
> >>
> >> This is because ContinuousFileSplitEnumerator has to continuously add
> new files to splitAssigner, while StaticFileSplitEnumerator does not. The
> pathsAlreadyProcessed set records the paths already discovered by
> ContinuousFileSplitEnumerator so that it will not add the same file to
> splitAssigner twice. For StaticFileSplitEnumerator it does not need to
> discover new files and all files have already been recorded in its
> splitAssigner so it does not need the pathsAlreadyProcessed set.
> >>
> >> For more detailed logic check the caller of the constructors of both
> enumerators.
> >>
> >> Krzysztof Chmielewski  于2022年1月6日周四
> 07:04写道:
> >>>
> >>> Hi,
> >>> Why StaticFileSplitEnumerator from FileSource does not track the
> already processed files similar to how ContinuousFileSplitEnumerator does?
> >>>
> >>> I'm thinking about scenario where we have a Bounded FileSource that
> reads a lot of files using FileSource and stream it's content to Kafka.If
> there will be a Job/cluster restart then we will process same files again.
> >>>
> >>> Regards,
> >>> Krzysztof Chmielewski
>


Re: 如何确定分配内存的大小

2022-01-11 Thread Chang Li
目前这个更多还是一个经验值,和具体业务有关使用有关,建议任务运行后观察JM和TM的GC情况后再做调整

许友昌 <18243083...@163.com> 于2022年1月10日周一 15:18写道:

> 请问在启动flink 任务时,要如何确定该分配多少内存给 jobmanager,分配多少给 taskmanager,当我们指定 -ytm 1024
> 或 -ytm 2048 的依据是什么?


Re: 谁能解释一下 GlobalStreamExchangeMode 这几种交换模式的不同和使用场景吗?

2022-01-11 Thread Chang Li
在生产环境中使用Flink是批示作业是OK的,不是很依赖Flink Remote Shuffle Service
Flink Remote Shuffle Service
主要解决大数据量Shuffle场景下的稳定性,目前Batch会将Shuffle的结果写本地磁盘,数量大的时候会容易将磁盘写满,稳定性也相对比较差

casel.chen  于2021年12月2日周四 08:26写道:

> GlobalStreamExchangeMode 这几种交换模式的不同和使用场景是什么?哪些适合流式作业,哪些适合批式作业?
> Flink Remote Shuffle Service的推出是不是意味着可以在生产环境使用Flink处理批式作业?谢谢!
>
>
> package org.apache.flink.streaming.api.graph;
>
>
>
>
> import org.apache.flink.annotation.Internal;
>
>
>
>
> @Internal
>
> public enum GlobalStreamExchangeMode {
>
> ALL_EDGES_BLOCKING,
>
> FORWARD_EDGES_PIPELINED,
>
> POINTWISE_EDGES_PIPELINED,
>
> ALL_EDGES_PIPELINED,
>
> ALL_EDGES_PIPELINED_APPROXIMATE;
>
>
>
>
> private GlobalStreamExchangeMode() {
>
> }
>
> }
>
>
>


Re: 关于streamFileSink在checkpoint下生成文件问题

2022-01-11 Thread Chang Li
直接用的开源版本吗?还是公司内部有改动,原生的cp是固定频率,而很多公司离线计算都是整点触发的,为了减少延迟,会自定义在整点触发一次cp,开源目前没有这个feature

黄志高  于2021年12月1日周三 21:53写道:

> hi,各位大佬,咨询个问题
>
>  
> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
>
>
>
>


Re: Re: 关于streamFileSink在checkpoint下生成文件问题

2022-01-11 Thread Chang Li
直接用的开源版本吗?还是公司内部有改动,原生的cp是固定频率,而很多公司离线计算都是整点触发的,为了减少延迟,会自定义在整点触发一次cp,开源目前没有这个feature

黄志高  于2021年12月2日周四 14:14写道:

> |
>
>
>
>
> 32684
> |
> COMPLETED
> | 8/8 | 13:52:36 | 13:52:38 | 2s | 126 KB | 0 B |
> | | 32683 |
> COMPLETED
> | 8/8 | 13:42:36 | 13:42:39 | 2s | 126 KB | 0 B |
> | | 32682 |
> COMPLETED
> | 8/8 | 13:32:36 | 13:32:39 | 2s | 126 KB | 0 B |
> | | 32681 |
> COMPLETED
> | 8/8 | 13:22:36 | 13:22:39 | 2s | 125 KB | 0 B |
> | | 32680 |
> COMPLETED
> | 8/8 | 13:12:36 | 13:12:39 | 2s | 125 KB | 0 B |
> | | 32679 |
> COMPLETED
> | 8/8 | 13:02:36 | 13:02:41 | 4s | 214 KB | 0 B |
> 上图是checkpoint
>
>
> 这个是在11月30号0时段生成的文件
> 2021-11-30 00:00:011080827 athena_other-0-217891.gz
> 2021-11-30 00:02:424309209 athena_other-0-217892.gz
> 2021-11-30 00:12:403902474 athena_other-0-217893.gz
> 2021-11-30 00:22:403886322 athena_other-0-217894.gz
> 2021-11-30 00:32:403988037 athena_other-0-217895.gz
> 2021-11-30 00:42:403892343 athena_other-0-217896.gz
> 2021-11-30 00:52:392972183 athena_other-0-217897.gz
> 2021-11-30 00:00:011125774 athena_other-1-219679.gz
> 2021-11-30 00:02:424338748 athena_other-1-219680.gz
> 2021-11-30 00:12:404204571 athena_other-1-219681.gz
> 2021-11-30 00:22:403852791 athena_other-1-219682.gz
> 2021-11-30 00:32:404025214 athena_other-1-219683.gz
> 2021-11-30 00:42:404205107 athena_other-1-219684.gz
> 2021-11-30 00:52:392922192 athena_other-1-219685.gz
> 2021-11-30 00:00:011103734 athena_other-2-220084.gz
>
>
> 这个是1点生成的文件
> 2021-11-30 01:00:011228793 athena_other-0-217951.gz
> 2021-11-30 01:02:424243566 athena_other-0-217952.gz
> 2021-11-30 01:12:404106305 athena_other-0-217953.gz
> 2021-11-30 01:22:404456214 athena_other-0-217954.gz
> 2021-11-30 01:32:414303156 athena_other-0-217955.gz
> 2021-11-30 01:42:404688872 athena_other-0-217956.gz
> 2021-11-30 01:52:403251910 athena_other-0-217957.gz
> 2021-11-30 01:00:011163354 athena_other-1-219736.gz
> 2021-11-30 01:02:424405233 athena_other-1-219737.gz
> 2021-11-30 01:12:404094502 athena_other-1-219738.gz
> 2021-11-30 01:22:404395071 athena_other-1-219739.gz
> 2021-11-30 01:32:404205169 athena_other-1-219740.gz
> 2021-11-30 01:42:404432610 athena_other-1-219741.gz
> 2021-11-30 01:52:403224111 athena_other-1-219742.gz
> 2021-11-30 01:00:011163964 athena_other-2-220137.gz
>
>
>
>
> 之前的截图无法发送,我把文件贴出来,打扰了
>
>
>
>
>
>
>
> 在 2021-12-02 13:52:43,"黄志高"  写道:
>
>
>
>
>
> Hi,我把文件放到下面的,文件在checkpoint可见我是理解的,但是文件的生成时间应该是在checkpoint以后是正常的,但是我却在每个整点时段看见数据文件,如下图所示,按理说文件的生成都是在checkpoint之后的,也就是2分,12,22,32,42,52分后,而每个00分都会生成一个数据文件,不理解这个文件怎么生成的,内部的滚动策略是OnCheckpointRollingPolicy
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-12-02 11:37:31,"Caizhi Weng"  写道:
> >Hi!
> >
> >邮件里看不到图片和附件,建议使用外部图床。
> >
> >partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
> >exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。
> >
> >黄志高  于2021年12月1日周三 下午9:53写道:
> >
> >> hi,各位大佬,咨询个问题
> >>
> >>
> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
> >>
> >>
> >>
> >>
>
>
>
>
>
>


回复: flink sql 如何提高下游并发度?

2022-01-11 Thread 许友昌
hi,


设置了parallelism=10 ,实际上是分配了 10 个 slot,flink 是会共享 slot 的,所以 sink 会有 10 线程。 

在2022年1月11日 16:53,RS 写道:
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?



在 2022-01-11 11:10:41,"Caizhi Weng"  写道:
Hi!

可以设置 parallelism.default 为需要的并发数。

Jeff  于2022年1月9日周日 19:44写道:

当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?


Re: Flink on Native K8s 部署模式下Tm和Jm容器配置Hosts问题

2022-01-11 Thread Yang Wang
你可以通过环境变量或者flink config option的方式来指定kube config

export KUBECONFIG=/path/of/kube.config

或者

-Dkubernetes.config.file=/path/of/kube.config

具体的代码在这里[1]

[1].
https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java#L58


Best,
Yang

JianWen Huang  于2022年1月10日周一 22:04写道:

> 首先感谢您答复。我也想到了采用第二种JOB动态+ConfigMap挂到Flink Client Pod中,然后命令提交。
> 另外您和官方文档都提到kube config的配置。请问flink client在源码实现中是在哪个地方去解析读取kube config的?
>
> Yang Wang  于2022年1月10日周一 15:17写道:
> >
> > 抱歉回复晚了
> >
> > 在实践中,Flink on Native K8s的部署方式需要一个机器同时拥有k8s和flink客户端才能很好的完成部署工作。
> >
> > Flink client并不依赖K8s客户端的,只要有对应的kube config就可以了
> >
> >
> > 你说的两种方法都是可以的,而且也没有本质上的差异。都是把Flink client运行在集群内来完成提交,第一种是常驻的,第二种是动态起的 。
> > 如果作业使用的pod template都是一样的,那就可以自己保存在ConfigMap中然后挂载给Flink client pod就可以了。
> > 如果每个作业使用的都不同,就只能按照你说的方法了
> >
> >
> > 另外,还有一个可行的思路是开发一个你们自己的K8s operator,然后通过CR的方式进行传递。可以参考这个简单的demo[1]
> >
> > [1]. https://github.com/wangyang0918/flink-native-k8s-operator
> >
> >
> > Best,
> > Yang
> >
> >
> >
> > JianWen Huang  于2021年12月30日周四 00:01写道:
> >
> > > 明白了。感谢。
> > > 在实践中,Flink on Native K8s的部署方式需要一个机器同时拥有k8s和flink客户端才能很好的完成部署工作。
> > > 请问在工程实践上有什么比较好的持续集成提交方式。我目前想到两种。
> > > 1.在k8s 启动一个带flink客户端的容器。在容器内部进行命令行提交。
> > > 2.在k8s以带Flink客户端的镜像启动一个Job类型作业,然后在作业运行时进行命令提交。
> > >
> > >
> 第1种对于kubernetes.pod-template-file的提交需要把kubernetes.pod-template-file中的模板文件cp到容器中。
> > > 第2种需要提前把kubernetes.pod-template-file文件打到带Flink客户端的镜像中。
> > > 请问您有更好的方法吗。
> > >
> > > Yang Wang  于2021年12月26日周日 16:39写道:
> > > >
> > > > 拿如下提交命令举例,pod-temlate.yaml是在和运行run-application这个命令相同的机器上面。Flink
> > > > client会自动把这个文件存放到ConfigMap,然后挂载给JM的
> > > > user jar(StateMachineExample.jar)是需要在镜像里面
> > > >
> > > > 注意:一般需要在镜像里面的都会使用local://这个schema,本地文件则不需要
> > > >
> > > > bin/flink run-application -t kubernetes-application \
> > > > -Dkubernetes.cluster-id=my-flink-cluster \
> > > > -Dkubernetes.pod-template-file=/path/of/pod-template.yaml \
> > > > local:///opt/flink/examples/streaming/StateMachineExample.jar
> > > >
> > > >
> > > >
> > > > 如果还是不明白,看一下这个测试的实现就清楚了[1]
> > > >
> > > > [1].
> > > >
> > >
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_kubernetes_application_ha.sh
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > 黄剑文  于2021年12月24日周五 17:57写道:
> > > >
> > > > > client-local的文件,不是镜像里面的。这句话该怎么理解?因为run-application
> > > > >
> > > > >
> > >
> 模式下是需要将用户jar包跟flink标准镜像打到一起形成自己镜像然后进行提交。那么这个文件该放在哪个地方?目前我指定路径发现读的是镜像包中的路径。如/opt/my-pod-template。读的是镜像中/opt/my-pod-template文件。
> > > > >
> > > > > 谢谢您的回复。
> > > > >
> > > > > Yang Wang  于2021年12月24日周五 11:18写道:
> > > > > >
> > > > > > 使用flink
> > > > > >
> > >
> run-application来提交任务时,kubernetes.pod-template-file需要指定的是一个client-local的文件
> > > > > > 不是镜像里面的
> > > > > >
> > > > > > Best,
> > > > > > Yang
> > > > > >
> > > > > > hjw <1010445...@qq.com.invalid> 于2021年12月23日周四 22:21写道:
> > > > > >
> > > > > > > Flink版本:1.13Flink基于Native K8s
> > > > > > >
> > > > >
> > >
> 部署模式下,因为有场景需要,jobmanager和taskmanager需要配置一些特定的hosts,查阅官方文档后发现可以支持自己指定一些pod-Template来指定jm和tm的一些K8s部署行为,但这些pod-Template需要打在提交客户端镜像里。
> > > > > > >
> > > > > > >
> > > > >
> > >
> 问题是jm和tm在不同环境下需要配置的Hosts并不相同。如开发环境,测试环境,生产环境。这意味着不同环境需维护不同的镜像。请问各位在使用上有什么好方法去解决呢。谢谢。
> > > > >
> > >
>


Re: flink sql 如何提高下游并发度?

2022-01-11 Thread Chang Li
可以的,提供一个思路,读取了kafka的数据后,直接输出原生的字节流后,接一层map算子做序列化相关工作,map算子的并发度你可以自己控制,这样kafka拉取就不会是瓶颈,大量的计算工作放到了map中,而map的并发度是可以自己控制的

Jeff  于2022年1月9日周日 19:45写道:

> 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?


回复: flink sql 如何提高下游并发度?

2022-01-11 Thread JasonLee
hi


是 10 目前 source 还不支持单独设置并发度,但是 sink 是支持的,当然如果没有单独设置的话 sink 也是 10


Best
JasonLee


在2022年01月11日 16:52,RS 写道:
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?



在 2022-01-11 11:10:41,"Caizhi Weng"  写道:
Hi!

可以设置 parallelism.default 为需要的并发数。

Jeff  于2022年1月9日周日 19:44写道:

当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?


Re: flink sql 如何提高下游并发度?

2022-01-11 Thread chang li
可以的,提供一个思路,读取了kafka的数据后,直接输出原生的字节流后,接一层map算子做序列化相关工作,map算子的并发度你可以自己控制,这样kafka拉取就不会是瓶颈,大量的计算工作放到了map中,而map的并发度是可以自己控制的

Caizhi Weng  于2022年1月11日周二 11:11写道:

> Hi!
>
> 可以设置 parallelism.default 为需要的并发数。
>
> Jeff  于2022年1月9日周日 19:44写道:
>
> > 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?
>


Re:Re: flink sql 如何提高下游并发度?

2022-01-11 Thread RS
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?



在 2022-01-11 11:10:41,"Caizhi Weng"  写道:
>Hi!
>
>可以设置 parallelism.default 为需要的并发数。
>
>Jeff  于2022年1月9日周日 19:44写道:
>
>> 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?