Re: Wrong format when passing arguments with space

2022-03-31 Thread 姜鑫
Hi Kevin,

I noticed that the two quotas in your time string looks different. Please 
confirm that it is a typo or not.

Best,
Xin 


> 2022年3月28日 上午11:58,Kevin Lee  写道:
> 
> Flink version : 1.13
> 
> Bug:
> When I pass an argument with space by single quota.
> The main function get this argument with a double quota
> 
> example:
> ./bin/flink run -c com.lmk.QuotaTest --rate 10 --time ''2022-03-28 11:53:21"
> 
> The main function get parameters:
> 
> 1-rate
> 2---10
> 3-time
> 4---"2022-03-28 11:53:21"
> 
> 
> I think flink shell should remove the double quota in "2022-03-28 11:53:21"
> 
> 
> Hope to get your reply asap



Re: Wrong format when passing arguments with space

2022-03-31 Thread 姜鑫
Hi Kevin,

I noticed that the two quotas in your time string looks different. Please 
confirm that it is a typo or not.

Best,
Xin 


> 2022年3月28日 上午11:58,Kevin Lee  写道:
> 
> Flink version : 1.13
> 
> Bug:
> When I pass an argument with space by single quota.
> The main function get this argument with a double quota
> 
> example:
> ./bin/flink run -c com.lmk.QuotaTest --rate 10 --time ''2022-03-28 11:53:21"
> 
> The main function get parameters:
> 
> 1-rate
> 2---10
> 3-time
> 4---"2022-03-28 11:53:21"
> 
> 
> I think flink shell should remove the double quota in "2022-03-28 11:53:21"
> 
> 
> Hope to get your reply asap



Re:Re: flink jdbc source oom

2022-03-31 Thread Michael Ran
这个当初提过自定义SQL 数据集,但是社区否定了这种做法- -,但是从功能上来说,我们也是实现的自定义SQL结果集,进行join 
之类的操作,在大数据集,以及一些数据排序、剔除重复等场景有一定优势
在 2022-04-01 10:12:55,"Lincoln Lee"  写道:
>@Peihui  当前社区的 jdbc table source 实现了这些接口:
>ScanTableSource,
>LookupTableSource,
>SupportsProjectionPushDown,
>SupportsLimitPushDown
>
>其中 lookup table source 用于维表的 kv lookup 查询,  scan table source 支持了
>projection 和 limit 下推, 如果有需求做其他 pushdown.可以尝试自行扩展 connector 来实现比如
>filter/aggregate pushdown 满足前置过滤需求
>
>
>Best,
>Lincoln Lee
>
>
>r pp  于2022年3月31日周四 18:40写道:
>
>> hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
>>


Re: flinksql 使用kafka connector (自定义的cdc格式) 消费删除语义的数据没有纳入计算

2022-03-31 Thread Guo Thompson
看不到图

赵旭晨  于2022年3月15日周二 12:25写道:

> flink版本:1.14.3   场景如下:
> sql:
> set table.exec.state.ttl=1 day;
> describe t_k_chargeorder;
> describe t_k_appointment;
> SELECT
> ReportTime,
> sum( InsertAppointmentCount ) + sum( InsertChargeOrderCount )
> kpitotalcount,
> sum( InsertActualPriceCount ) InsertActualPriceCount,
> sum( InsertAppointmentCount ) InsertAppointmentCount,
> sum( InsertChargeOrderCount ) InsertChargeOrderCount,
> now() LastUpdatedDT
> from
> (
> SELECT
> DATE_FORMAT( recordcreatedtime, '-MM-dd' ) ReportTime,
> sum( actualprice ) InsertActualPriceCount,
> 0 InsertShortMessageCount,
> 0 InsertAppointmentCount,
> 0 InsertImageCount,
> 0 InsertChargeOrderCount,
> 0 InsertPerioExamCount,
> 0 InsertMedicalCount,
> 0 InsertPatientCount,
> 0 InsertGeneralExamCount,
> 0 InsertFollowupCount
> FROM
> --effective_chargeorder t
> (SELECT
> o.recordcreatedtime,
> o.recordcreateduser,
> o.status,
> o._is_delete,
> o.appointmentid,
> o.id,
> o.tenantid,
> o.actualprice,
> o.proc_time,
> t.Name,
> t.IsInactive
> FROM
> t_k_chargeorder AS o
> INNER JOIN t_dental_tenant FOR SYSTEM_TIME AS OF o.proc_time AS t
> ON o.tenantid = t.Id
> WHERE
> t.IsInactive = '0'
> AND o.recordcreateduser > 0
> AND o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' )
> AND o._is_delete = '0'
> AND o.appointmentid > 0) t
> WHERE
> recordcreatedtime BETWEEN concat( DATE_FORMAT( now() , '-MM-dd' ),
> ' 00:00:00' )
> AND now()
> GROUP BY
> DATE_FORMAT( recordcreatedtime, '-MM-dd' )
> ) a
> group by ReportTime;
>
> DAG图如下:
> 业务库的新增、修改操作都能监听到,并给出正确结果。
> 但只要是删除语义,kafka的cdc format能消费到删除数据
>
> 但sql计算结果却没有作相应的扣减,如下:
> 删除后应该由150>100,但什么也没有发生,感觉是内部算子把这条-D给过滤了
> 恳请大佬解惑~~
>
>
>
>
>
>
>


Re: flink jdbc source oom

2022-03-31 Thread Guo Thompson
难道条件还不会下推么?

Peihui He  于2022年3月31日周四 10:33写道:

> Hi, all
>
> 请教下大家,使用flink jdbc 读取tidb中数据时如何在查询的时候能否根据条件在数据库层面做一些过滤呢?
> 当数据量很大比如几千万上亿的话,flink jdbc source 就很无力了。
>
>
> Best Regards!
>


Question about community collaboration options

2022-03-31 Thread Hao t Chang
Hi,

I have been looking into Flink and joined the mailing lists recently. I am 
trying to figure out how the community members collaborate. For example, is 
there Slack channels or Weekly sync up calls where the community members can 
participate and talk with each other to brainstorm, design, and make decisions?

Ted


Re: How to debug Metaspace exception?

2022-03-31 Thread John Smith
Ok so if there's a leak, if I manually stop the job and restart it from the
UI multiple times, I won't see the issue because because the classes are
unloaded correctly?


On Thu, Mar 31, 2022 at 9:20 AM huweihua  wrote:

>
> The difference is that manually canceling the job stops the JobMaster, but
> automatic failover keeps the JobMaster running. But looking on TaskManager,
> it doesn't make much difference
>
>
> 2022年3月31日 上午4:01,John Smith  写道:
>
> Also if I manually cancel and restart the same job over and over is it the
> same as if flink was restarting a job due to failure?
>
> I.e: When I click "Cancel Job" on the UI is the job completely unloaded vs
> when the job scheduler restarts a job because if whatever reason?
>
> Lile this I'll stop and restart the job a few times or maybe I can trick
> my job to fail and have the scheduler restart it. Ok let me think about
> this...
>
> On Wed, Mar 30, 2022 at 10:24 AM 胡伟华  wrote:
>
>> So if I run the same jobs in my dev env will I still be able to see the
>> similar dump?
>>
>> I think running the same job in dev should be reproducible, maybe you can
>> have a try.
>>
>>  If not I would have to wait at a low volume time to do it on production.
>> Aldo if I recall the dump is as big as the JVM memory right so if I have
>> 10GB configed for the JVM the dump will be 10GB file?
>>
>> Yes, JMAP will pause the JVM, the time of pause depends on the size to
>> dump. you can use "jmap -dump:live" to dump only the reachable objects,
>> this will take a brief pause
>>
>>
>>
>> 2022年3月30日 下午9:47,John Smith  写道:
>>
>> I have 3 task managers (see config below). There is total of 10 jobs with
>> 25 slots being used.
>> The jobs are 100% ETL I.e; They load Json, transform it and push it to
>> JDBC, only 1 job of the 10 is pushing to Apache Ignite cluster.
>>
>> FOR JMAP. I know that it will pause the task manager. So if I run the
>> same jobs in my dev env will I still be able to see the similar dump? I I
>> assume so. If not I would have to wait at a low volume time to do it on
>> production. Aldo if I recall the dump is as big as the JVM memory right so
>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>
>>
>> # Operating system has 16GB total.
>> env.ssh.opts: -l flink -oStrictHostKeyChecking=no
>>
>> cluster.evenly-spread-out-slots: true
>>
>> taskmanager.memory.flink.size: 10240m
>> taskmanager.memory.jvm-metaspace.size: 2048m
>> taskmanager.numberOfTaskSlots: 16
>> parallelism.default: 1
>>
>> high-availability: zookeeper
>> high-availability.storageDir: file:///mnt/flink/ha/flink_1_14/
>> high-availability.zookeeper.quorum: ...
>> high-availability.zookeeper.path.root: /flink_1_14
>> high-availability.cluster-id: /flink_1_14_cluster_0001
>>
>> web.upload.dir: /mnt/flink/uploads/flink_1_14
>>
>> state.backend: rocksdb
>> state.backend.incremental: true
>> state.checkpoints.dir: file:///mnt/flink/checkpoints/flink_1_14
>> state.savepoints.dir: file:///mnt/flink/savepoints/flink_1_14
>>
>> On Wed, Mar 30, 2022 at 2:16 AM 胡伟华  wrote:
>>
>>> Hi, John
>>>
>>> Could you tell us you application scenario? Is it a flink session
>>> cluster with a lot of jobs?
>>>
>>> Maybe you can try to dump the memory with jmap and use tools such as MAT
>>> to analyze whether there are abnormal classes and classloaders
>>>
>>>
>>> > 2022年3月30日 上午6:09,John Smith  写道:
>>> >
>>> > Hi running 1.14.4
>>> >
>>> > My tasks manager still fails with java.lang.OutOfMemoryError:
>>> Metaspace. The metaspace out-of-memory error has occurred. This can mean
>>> two things: either the job requires a larger size of JVM metaspace to load
>>> classes or there is a class loading leak.
>>> >
>>> > I have 2GB of metaspace configed
>>> taskmanager.memory.jvm-metaspace.size: 2048m
>>> >
>>> > But the task nodes still fail.
>>> >
>>> > When looking at the UI metrics, the metaspace starts low. Now I see
>>> 85% usage. It seems to be a class loading leak at this point, how can we
>>> debug this issue?
>>>
>>>
>>
>


[DataStream API] Watermarks not closing TumblingEventTimeWindow with Reduce function

2022-03-31 Thread Ryan van Huuksloot
Hello!

*Problem:*
I am connecting to a Kafka Source with the Watermark Strategy below.

val watermarkStrategy = WatermarkStrategy
  .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS))
  .withTimestampAssigner(new
SerializableTimestampAssigner[StarscreamEventCounter_V1] {
override def extractTimestamp(element: StarscreamEventCounter_V1,
recordTimestamp: Long): Long =
  element.envelopeTimestamp
  })

The Watermarks are correctly getting assigned.
However, when a reduce function is used the window never terminates
because the `ctx.getCurrentWatermark()` returns the default value of
`-9223372036854775808` in perpetuity.

This is the stream code:

stream
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _)

The reduce uses this overloaded operator:

@JsonIgnoreProperties(ignoreUnknown = true)
case class StarscreamEventCounter_V1(
  envelopeTimestamp: Long,
  numberOfEvents: Int = 1
) {
  def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = {

StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp),
that.numberOfEvents + this.numberOfEvents)
  }


*Attempt to Solve:*
1. Validate that the Watermark is set on the source
a. Set a custom trigger to emit a watermark on each event just in case
2. Test with aggregate / process functions
a. Both other functions work properly - window closes and emits to a
PrintSink
3. Change Watermark Generator to a custom generator
a. Also change time horizons and let run for 1 day - window never
closes due to the watermark being stuck at the min default. The sink never
receives the data but the UI says there are records being output!

*Hypothesis:*
The output id of a reduce function is causing an upstream issue where the
watermark is no longer assigned to the Window. I haven't been able to lock
down what exactly is causing the issue though. My thought is that it might
be a bug given it works for Aggregate/Process.
It could be a bug in the IndexedCombinedWatermarkStatus, the partial
watermarks should not be the min default value when I set the watermark per
event - this is what I will be looking into until I hear back. I validated
that the watermark is set correctly in CombinedWatermarkStatus.

*Tools:*
- Flink 1.14.3
- Scala
- DataStream API

Any assistance would be great! Happy to provide more context or
clarification if something isn't clear!

Thanks!

Ryan van Huuksloot
Data Developer | Data Platform Engineering | Streaming Capabilities
[image: Shopify]



Re: how to achieve sideOutputLateData() in FlinkSQL?

2022-03-31 Thread liuxiangcao
Thanks a lot for the help!  Yu'an and Martijn.

To share and confirm my understanding, the recipe using CURRENT_WATERMARK
to get late data will return all data arriving later than the defined
bounded out-of-orderness, without consideration of window closing time.
In comparison, WindowedStream.sideOutputLateData(OutputTag) will only
return data that arrives after its window is already closed.

On Thu, Mar 31, 2022 at 2:26 AM Martijn Visser 
wrote:

> Hi,
>
> The only thing you currently can do is filter out late data using the
> CURRENT_WATERMARK function since Flink 1.14. There's a SQL Cookbook recipe
> on this function which can be found at
> https://github.com/ververica/flink-sql-cookbook/blob/main/other-builtin-functions/03_current_watermark/03_current_watermark.md
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
> https://github.com/MartijnVisser
>
>
> On Thu, 31 Mar 2022 at 11:00, yu'an huang  wrote:
>
>> Hi, in my understanding, Flink only support to get late data by side
>> output in data stream api currently. For Table API/SQL, unfortunately, late
>> events will always be dropped.
>>
>> You can see this link as reference:
>> https://stackoverflow.com/questions/60218235/using-event-time-with-lateness-in-flink-sql-windows
>>
>>
>>
>> > On 31 Mar 2022, at 5:38 AM, liuxiangcao 
>> wrote:
>> >
>> > Hi Flink community,
>> >
>> > In Flink DataStream Java API,  user can get get data that was discarded
>> as late using WindowedStream.sideOutputLateData(OutputTag) (see [1]).  I'm
>> wondering what is the best way for user to achieve this in Flink SQL?
>> >
>> > For background, we are providing pure sql deployment to our internal
>> users which means user won't be using Flink Table API directly. They will
>> write Flink SQL script, with Java only used for UDF.
>> >
>> > Would appreciate if any one here can share your insights or
>> experiences. Thanks!
>> >
>> > [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>> >
>> >
>> >
>>
>>

-- 
Best Wishes & Regards
Xiangcao Liu


Could you please give me a hand about json object in flink sql

2022-03-31 Thread wang
Hi dear engineer,


Thanks so much for your precious time reading my email!
Resently I'm working on the Flink sql (with version 1.13) in my project and 
encountered one problem about json format data, hope you can take a look, 
thanks! Below is the description of my issue.


I use kafka as source and sink, I define kafka source table like this:


 CREATE TABLE TableSource (
  schema STRING,
  payload ROW(
  `id` STRING,
  `content` STRING
 )
 )
 WITH (
 'connector' = 'kafka',
 'topic' = 'topic_source',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'all_gp',
 'scan.startup.mode' = 'group-offsets',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
 );


Define the kafka sink table like this:


 CREATE TABLE TableSink (
  `id` STRING NOT NULL,
  `content` STRING NOT NULL
 )
 WITH (
 'connector' = 'kafka',
 'topic' = 'topic_sink',
 'properties.bootstrap.servers' = 'localhost:9092',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
);




Then insert into TableSink with data from TableSource:
INSERT INTO TableSink SELECT id, content FROM TableSource;


Then I use "kafka-console-producer.sh" to produce data below into topic 
"topic_source" (TableSource):
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"name\":\"Jone\",\"age\":20}"}}




Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
output is:
{"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}


But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
I want the the value of "content" is json object, not json string.


And what's more, the format of "content" in TableSource is not fixed, it can be 
any json formated(or json array format) string, such as:
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




So my question is, how can I transform json format string(like 
"{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
(like{"name":"Jone","age":20} ).




Thanks && Regards,
Hunk



Could you please give me a hand about json object in flink sql

2022-03-31 Thread wang
Hi dear engineer,


Thanks so much for your precious time reading my email!
Resently I'm working on the Flink sql (with version 1.13) in my project and 
encountered one problem about json format data, hope you can take a look, 
thanks! Below is the description of my issue.


I use kafka as source and sink, I define kafka source table like this:


 CREATE TABLE TableSource (
  schema STRING,
  payload ROW(
  `id` STRING,
  `content` STRING
 )
 )
 WITH (
 'connector' = 'kafka',
 'topic' = 'topic_source',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'all_gp',
 'scan.startup.mode' = 'group-offsets',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
 );


Define the kafka sink table like this:


 CREATE TABLE TableSink (
  `id` STRING NOT NULL,
  `content` STRING NOT NULL
 )
 WITH (
 'connector' = 'kafka',
 'topic' = 'topic_sink',
 'properties.bootstrap.servers' = 'localhost:9092',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
);




Then insert into TableSink with data from TableSource:
INSERT INTO TableSink SELECT id, content FROM TableSource;


Then I use "kafka-console-producer.sh" to produce data below into topic 
"topic_source" (TableSource):
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"name\":\"Jone\",\"age\":20}"}}




Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
output is:
{"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}


But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
I want the the value of "content" is json object, not json string.


And what's more, the format of "content" in TableSource is not fixed, it can be 
any json formated(or json array format) string, such as:
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




So my question is, how can I transform json format string(like 
"{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
(like{"name":"Jone","age":20} ).




Thanks && Regards,
Hunk



Re: How to debug Metaspace exception?

2022-03-31 Thread huweihua

The difference is that manually canceling the job stops the JobMaster, but 
automatic failover keeps the JobMaster running. But looking on TaskManager, it 
doesn't make much difference


> 2022年3月31日 上午4:01,John Smith  写道:
> 
> Also if I manually cancel and restart the same job over and over is it the 
> same as if flink was restarting a job due to failure?
> 
> I.e: When I click "Cancel Job" on the UI is the job completely unloaded vs 
> when the job scheduler restarts a job because if whatever reason?
> 
> Lile this I'll stop and restart the job a few times or maybe I can trick my 
> job to fail and have the scheduler restart it. Ok let me think about this...
> 
> On Wed, Mar 30, 2022 at 10:24 AM 胡伟华  > wrote:
>> So if I run the same jobs in my dev env will I still be able to see the 
>> similar dump? 
> I think running the same job in dev should be reproducible, maybe you can 
> have a try.
> 
>>  If not I would have to wait at a low volume time to do it on production. 
>> Aldo if I recall the dump is as big as the JVM memory right so if I have 
>> 10GB configed for the JVM the dump will be 10GB file?
> 
> Yes, JMAP will pause the JVM, the time of pause depends on the size to dump. 
> you can use "jmap -dump:live" to dump only the reachable objects, this will 
> take a brief pause
> 
> 
> 
>> 2022年3月30日 下午9:47,John Smith > > 写道:
>> 
>> I have 3 task managers (see config below). There is total of 10 jobs with 25 
>> slots being used.
>> The jobs are 100% ETL I.e; They load Json, transform it and push it to JDBC, 
>> only 1 job of the 10 is pushing to Apache Ignite cluster.
>> 
>> FOR JMAP. I know that it will pause the task manager. So if I run the same 
>> jobs in my dev env will I still be able to see the similar dump? I I assume 
>> so. If not I would have to wait at a low volume time to do it on production. 
>> Aldo if I recall the dump is as big as the JVM memory right so if I have 
>> 10GB configed for the JVM the dump will be 10GB file?
>> 
>> 
>> # Operating system has 16GB total.
>> env.ssh.opts: -l flink -oStrictHostKeyChecking=no
>> 
>> cluster.evenly-spread-out-slots: true
>> 
>> taskmanager.memory.flink.size: 10240m
>> taskmanager.memory.jvm-metaspace.size: 2048m
>> taskmanager.numberOfTaskSlots: 16
>> parallelism.default: 1
>> 
>> high-availability: zookeeper
>> high-availability.storageDir: file:///mnt/flink/ha/flink_1_14/ <>
>> high-availability.zookeeper.quorum: ...
>> high-availability.zookeeper.path.root: /flink_1_14
>> high-availability.cluster-id: /flink_1_14_cluster_0001
>> 
>> web.upload.dir: /mnt/flink/uploads/flink_1_14
>> 
>> state.backend: rocksdb
>> state.backend.incremental: true
>> state.checkpoints.dir: file:///mnt/flink/checkpoints/flink_1_14 <>
>> state.savepoints.dir: file:///mnt/flink/savepoints/flink_1_14 <>
>> 
>> On Wed, Mar 30, 2022 at 2:16 AM 胡伟华 > > wrote:
>> Hi, John
>> 
>> Could you tell us you application scenario? Is it a flink session cluster 
>> with a lot of jobs?
>> 
>> Maybe you can try to dump the memory with jmap and use tools such as MAT to 
>> analyze whether there are abnormal classes and classloaders
>> 
>> 
>> > 2022年3月30日 上午6:09,John Smith > > > 写道:
>> > 
>> > Hi running 1.14.4
>> > 
>> > My tasks manager still fails with java.lang.OutOfMemoryError: Metaspace. 
>> > The metaspace out-of-memory error has occurred. This can mean two things: 
>> > either the job requires a larger size of JVM metaspace to load classes or 
>> > there is a class loading leak.
>> > 
>> > I have 2GB of metaspace configed taskmanager.memory.jvm-metaspace.size: 
>> > 2048m
>> > 
>> > But the task nodes still fail.
>> > 
>> > When looking at the UI metrics, the metaspace starts low. Now I see 85% 
>> > usage. It seems to be a class loading leak at this point, how can we debug 
>> > this issue?
>> 
> 



Re: flink jdbc source oom

2022-03-31 Thread r pp
hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了


Re: how to achieve sideOutputLateData() in FlinkSQL?

2022-03-31 Thread Martijn Visser
Hi,

The only thing you currently can do is filter out late data using the
CURRENT_WATERMARK function since Flink 1.14. There's a SQL Cookbook recipe
on this function which can be found at
https://github.com/ververica/flink-sql-cookbook/blob/main/other-builtin-functions/03_current_watermark/03_current_watermark.md

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Thu, 31 Mar 2022 at 11:00, yu'an huang  wrote:

> Hi, in my understanding, Flink only support to get late data by side
> output in data stream api currently. For Table API/SQL, unfortunately, late
> events will always be dropped.
>
> You can see this link as reference:
> https://stackoverflow.com/questions/60218235/using-event-time-with-lateness-in-flink-sql-windows
>
>
>
> > On 31 Mar 2022, at 5:38 AM, liuxiangcao  wrote:
> >
> > Hi Flink community,
> >
> > In Flink DataStream Java API,  user can get get data that was discarded
> as late using WindowedStream.sideOutputLateData(OutputTag) (see [1]).  I'm
> wondering what is the best way for user to achieve this in Flink SQL?
> >
> > For background, we are providing pure sql deployment to our internal
> users which means user won't be using Flink Table API directly. They will
> write Flink SQL script, with Java only used for UDF.
> >
> > Would appreciate if any one here can share your insights or experiences.
> Thanks!
> >
> > [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
> >
> >
> >
>
>


Re: how to achieve sideOutputLateData() in FlinkSQL?

2022-03-31 Thread yu'an huang
Hi, in my understanding, Flink only support to get late data by side output in 
data stream api currently. For Table API/SQL, unfortunately, late events will 
always be dropped.

You can see this link as reference: 
https://stackoverflow.com/questions/60218235/using-event-time-with-lateness-in-flink-sql-windows



> On 31 Mar 2022, at 5:38 AM, liuxiangcao  wrote:
> 
> Hi Flink community,
> 
> In Flink DataStream Java API,  user can get get data that was discarded as 
> late using WindowedStream.sideOutputLateData(OutputTag) (see [1]).  I'm 
> wondering what is the best way for user to achieve this in Flink SQL?  
> 
> For background, we are providing pure sql deployment to our internal users 
> which means user won't be using Flink Table API directly. They will write 
> Flink SQL script, with Java only used for UDF.  
> 
> Would appreciate if any one here can share your insights or experiences. 
> Thanks! 
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
> 
> 
>