Async code inside Flink Sink

2024-04-17 Thread Jacob Rollings
Hello,

I have a use case where I need to do a cache file deletion after a
successful sunk operation(writing to db). My Flink pipeline is built using
Java. I am contemplating using Java completableFuture.runasync() to perform
the file deletion activity. I am wondering what issues this might cause in
terms of thread management and next event processing.

Also related to the same usecase, in another Flink pipeline. I might need
to do this cache file deletion in a timed fashion. For example, every five
minutes  I have to check for cache files that are older than currently
opened cache file that is serving some data into the Sink function. All old
cache files that are in closed status need to be deleted in a timely manner.

All this deletion has to happen asynchronously without blocking the flow of
events from Flink source to sink.
Also, based on requirement, I cannot make the whole Sink operation async. I
have to make the file based cache  deletion alone async inside the Sink
function.

Does Flink support timers or async blocks?

Any inputs  will be highly helpful.

Thanks,
JB


Re: Parallelism for auto-scaling, memory for auto-tuning - Flink operator

2024-04-17 Thread Zhanghao Chen
If you have some experience before, I'd recommend setting a good parallelism 
and TM resource spec first, to give the autotuner a good starting point. 
Usually, the autoscaler can tune your jobs well within a few attempts (<=3). As 
for  `pekko.ask.timeout`, the default value should be sufficient in most cases.

Best,
Zhanghao Chen

From: Maxim Senin via user 
Sent: Thursday, April 18, 2024 5:56
To: user@flink.apache.org 
Subject: Parallelism for auto-scaling, memory for auto-tuning - Flink operator


Hi.

Does it make sense to specify `parallelism` for task managers or the `job`, 
and, similarly, to specify memory amount for the task managers, or it’s better 
to leave it to autoscaler and autotuner to pick the best values? How many times 
would the autoscaler need to restart task managers before it picks the right 
values? Does `pekko.ask.timeout` need to be sufficient for task managers to get 
into running state with all the restarts?



Cheers,

Maxim



COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email 
is confidential and is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful.


Re:Flink流批一体应用在实时数仓数据核对场景下有哪些注意事项?

2024-04-17 Thread casel.chen
有人尝试这么实践过么?可以给一些建议么?谢谢!

















在 2024-04-15 11:15:34,"casel.chen"  写道:
>我最近在调研Flink实时数仓数据质量保障,需要定期(每10/20/30分钟)跑批核对实时数仓产生的数据,传统方式是通过spark作业跑批,如Apache 
>DolphinScheduler的数据质量模块。
>但这种方式的最大缺点是需要使用spark sql重写flink 
>sql业务逻辑,难以确保二者一致性。所以我在考虑能否使用Flink流批一体特性,复用flink 
>sql,只需要将数据源从cdc或kafka换成hologres或starrocks表,再新建跑批结果表,最后只需要比较相同时间段内实时结果表和跑批结果表的数据即可。不过有几点疑问:
>1. 原实时flink sql表定义中包含的watermark, process_time和event_time这些字段可以复用在batch mode下么?
>2. 实时双流关联例如interval join和temporal join能够用于batch mode下么?
>3. 实时流作业中的窗口函数能够复用于batch mode下么?
>4. 其他需要关注的事项有哪些?


Parallelism for auto-scaling, memory for auto-tuning - Flink operator

2024-04-17 Thread Maxim Senin via user
Hi.

Does it make sense to specify `parallelism` for task managers or the `job`, 
and, similarly, to specify memory amount for the task managers, or it’s better 
to leave it to autoscaler and autotuner to pick the best values? How many times 
would the autoscaler need to restart task managers before it picks the right 
values? Does `pekko.ask.timeout` need to be sufficient for task managers to get 
into running state with all the restarts?

Cheers,
Maxim



COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email 
is confidential and is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful.


Re: Understanding default firings in case of allowed lateness

2024-04-17 Thread Sachin Mittal
Hi Xuyang,

So if I check the side output way then my pipeline would be something like
this:

final OutputTag lateOutputTag = new OutputTag("late-data"){};

SingleOutputStreamOperator  reducedDataStream =

dataStream
.keyBy(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.allowedLateness(Time.seconds(180))

 .sideOutputLateData(lateOutputTag)
.reduce(new MyDataReducer());


DataStream lateStream = reducedDataStream.getSideOutput
(lateOutputTag);

lateStream
.keyBy(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.allowedLateness(Time.seconds(480))

.reduce(new MyDataReducer());


So basically I collect all the late data into another stream and apply the
same transformations again on it, to get reduced data from this late data.
Is this the correct handling for having reduced data from late data only ?

Also I have a few more queries
1. Now for this late records stream not having to drop records, I would
have to set allowed lateness to be of a larger value than what I had set in
the first stream transformation ?
   Basically do I need to set any allowed lateness for the window operation
of the late data stream if I want to also reduce them the same way as in
time records ?

2. Also when I collect late data as side output, would the reduced function
now only contain the data reduced from in time records only and no late
records would be included in the subsequent reduced data.

Basically after this the output of reduced data will only contain:

[ reduceData (d1, d2, d3) ]

and not any data like:
reducedData(d1, d2, d3, late d4, late d5)  or reducedData(d1, d2, d3, late
d4, late d5, late d6)

And transformation of lata data stream would now contain reduced data from:

[ reducedData(late d4, late d5, late d6) ]

Thanks
Sachin


On Wed, Apr 17, 2024 at 4:05 PM Xuyang  wrote:

> Hi, Sachin.
>
> IIUC, it is in the second situation you listed, that is:
> [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late
> d6) ].
> However, because of `table.exec.emit.late-fire.delay`, it could also be
> such as
> [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5),
> reducedData(ld1, d2, d3, late d4, late d5, late d6) ]
>
> Actually, allow-lateness(table.exec.emit.allow-lateness) is used to
> control when it decides not to change the value of the window output, and
> allowing the framework to automatically clear the corresponding state.
>
> > Also if I want the reduced data from late records to not include the
> data emitted within the window bounds, how can I do the same ?
> or if this is handled as default case ?
>
> Maybe side output[1] can help you to collect the late data and re-compute
> them.
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/
>
> --
> Best!
> Xuyang
>
>
> At 2024-04-17 16:56:54, "Sachin Mittal"  wrote:
>
> Hi,
>
> Suppose my pipeline is:
>
> data
> .keyBy(new MyKeySelector())
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .allowedLateness(Time.seconds(180))
> .reduce(new MyDataReducer())
>
>
> So I wanted to know if the final output stream would contain reduced data
> at the end of the window mark and also another reduced data at the end of
> allowed lateness ?
> If that is the case, then the reduced data at the end of allowed lateness
> would also include the data from non late records or it will only include
> reduced data from late records.
>
> Example
>
> If I have data in sequence:
>
> [window start], d1, d2, d3, [window end], late d4, late d5, late d6, [end
> of allowed lateness]
>
> The resultant stream after window and reduce operation would be:
>
> [ reduceData (d1, d2, d3), reducedData(late d4, late d5, late d6) ]
>
> or
> [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late
> d6) ]
>
> or something else ?
>
> Also if I want the reduced data from late records to not include the data
> emitted within the window bounds, how can I do the same ?
> or if this is handled as default case ?
>
> Thanks
> Sachin
>
>
>
>
>


Re:Understanding default firings in case of allowed lateness

2024-04-17 Thread Xuyang
Hi, Sachin.


IIUC, it is in the second situation you listed, that is:
[ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late d6) 
]. 
However, because of `table.exec.emit.late-fire.delay`, it could also be such as 
[ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5), 
reducedData(ld1, d2, d3, late d4, late d5, late d6) ]


Actually, allow-lateness(table.exec.emit.allow-lateness) is used to control 
when it decides not to change the value of the window output, and
allowing the framework to automatically clear the corresponding state.


> Also if I want the reduced data from late records to not include the data 
> emitted within the window bounds, how can I do the same ?
or if this is handled as default case ?


Maybe side output[1] can help you to collect the late data and re-compute them.
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/



--

Best!
Xuyang




At 2024-04-17 16:56:54, "Sachin Mittal"  wrote:

Hi,



Suppose my pipeline is:


data
.keyBy(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.allowedLateness(Time.seconds(180))
.reduce(new MyDataReducer())


So I wanted to know if the final output stream would contain reduced data at 
the end of the window mark and also another reduced data at the end of allowed 
lateness ?
If that is the case, then the reduced data at the end of allowed lateness would 
also include the data from non late records or it will only include reduced 
data from late records.


Example


If I have data in sequence:


[window start], d1, d2, d3, [window end], late d4, late d5, late d6, [end of 
allowed lateness] 


The resultant stream after window and reduce operation would be:


[ reduceData (d1, d2, d3), reducedData(late d4, late d5, late d6) ]


or
[ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late d6) ]


or something else ?


Also if I want the reduced data from late records to not include the data 
emitted within the window bounds, how can I do the same ?
or if this is handled as default case ?


Thanks
Sachin









Understanding default firings in case of allowed lateness

2024-04-17 Thread Sachin Mittal
Hi,

Suppose my pipeline is:

data
.keyBy(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.allowedLateness(Time.seconds(180))
.reduce(new MyDataReducer())


So I wanted to know if the final output stream would contain reduced data
at the end of the window mark and also another reduced data at the end of
allowed lateness ?
If that is the case, then the reduced data at the end of allowed lateness
would also include the data from non late records or it will only include
reduced data from late records.

Example

If I have data in sequence:

[window start], d1, d2, d3, [window end], late d4, late d5, late d6, [end
of allowed lateness]

The resultant stream after window and reduce operation would be:

[ reduceData (d1, d2, d3), reducedData(late d4, late d5, late d6) ]

or
[ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late
d6) ]

or something else ?

Also if I want the reduced data from late records to not include the data
emitted within the window bounds, how can I do the same ?
or if this is handled as default case ?

Thanks
Sachin


Re: Elasticsearch8 example

2024-04-17 Thread Hang Ruan
Hi Tauseef.

I see that the support of Elasticsearch 8[1] will be released
in elasticsearch-3.1.0. So there is no docs for the elasticsearch8 by now.
We could learn to use it by some tests[2] before the docs is ready.

Best,
Hang

[1] https://issues.apache.org/jira/browse/FLINK-26088
[2]
https://github.com/apache/flink-connector-elasticsearch/blob/main/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java

Tauseef Janvekar  于2024年4月17日周三 01:12写道:

> Dear Team,
>
> Can anyone please share an example for flink-connector-elasticsearch8
> I found this connector being added to the github. But no proper
> documentation is present around it.
>
> It will be of great help if a sample code is provided on the above
> connector.
>
> Thanks,
> Tauseef
>


Re: Table Source from Parquet Bug

2024-04-17 Thread Hang Ruan
Hi, David.

Have you added the parquet format[1] dependency in your dependencies?
It seems that the class ParquetColumnarRowInputFormat cannot be found.

Best,
Hang

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/parquet/

Sohil Shah  于2024年4月17日周三 04:42写道:

> Hello David,
>
> Since this is a ClassNotFoundException, you maybe missing a dependency.
> Could you share your pom.xml.
>
> Thanks
> -Sohil
> Project: Braineous https://bugsbunnyshah.github.io/braineous/
>
> On Tue, Apr 16, 2024 at 11:25 AM David Silva via user <
> user@flink.apache.org> wrote:
>
>> Hi,
>>
>> Our team would like to leverage Flink but we're running into some issues
>> with reading from a parquet file source. I *think* it's an issue with
>> the Flink API
>>
>> Could someone please help take a look?
>>
>> We're using *Scala 2.12* & *Flink 1.18.1*
>>
>> I attached a copy of the code, the terminal output, and the flink logs.
>>
>> The issue is @ *MacFlapAggregator.scala:324*, it errors because of:
>> *Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat*
>>
>>
>>- *MacFlapAggregator.scala:206 *creates and queries the same exact
>>table successfully though
>>- *MacFlapAggregator.scala:318 *If I create the table using a CSV
>>source, it works
>>
>>
>> I also posted in the slack server here
>> https://apache-flink.slack.com/archives/C03G7LJTS2G/p1713209215085589
>>
>> Any help with this would be immensely helpful, our team has been
>> struggling with this for a couple days now.
>>
>> Thanks!
>>
>