RE: DI in flink

2023-02-14 Thread Schwalbe Matthias
Hi Yashoda,

I use Spring-Boot to setup my job networks and DI-compose streaming components 
like operators/functions etc.
Important part is that all components need to be serializable in order for this 
to work.
Specific task implementations are a little more difficult (little experience) 
to set up in a DI way. If I’m not mistaken, Flink uses factories for this.

Sincere greetings

Thias


From: Yashoda Krishna T 
Sent: Wednesday, February 15, 2023 6:19 AM
To: Austin Cawley-Edwards 
Cc: user 
Subject: Re: DI in flink

Thanks Austin.
I can make use of Rich functions to solve my problem.

Thanks
Yashoda

On Wed, Feb 15, 2023 at 12:42 AM Austin Cawley-Edwards 
mailto:austin.caw...@gmail.com>> wrote:
(note: please keep user@flink.apache.org included 
in replies)

Ah, I see. Then no, this is not provided by Flink. When I've used dependency 
inject with Flink in the past, I instantiated everything in the `open()` method 
of the Flink Rich* classes. Could you solve this by having a common base Sink 
class or builder that does the configuring? I'm just wondering why it's 
necessary to solve it in Flink itself.

Best,
Austin

On Tue, Feb 14, 2023 at 11:05 AM Yashoda Krishna T 
mailto:yashoda.kris...@unbxd.com>> wrote:
This is my use case.
I have a sink function to push streaming data to S3. And I have a class lets 
call S3ConnProvider that provides me a connection object to S3, and a class 
lets say S3Util that has functions over S3 which injects S3ConnProvider.
If dependency injection works I can inject S3Util alone in my SinkFunction 
class. If not I have to initialize S3ConnProvider first and then S3Util.
This can become complex if there are too many initializations required 
depending on the use case.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


[SUMMARY] Flink 1.17 Release Sync 2/14/2023

2023-02-14 Thread Qingsheng Ren
Hi devs and users,

I'd like to share some highlights from Flink 1.17 release sync on
2/14/2023.

Release testing:

- The original deadline of cross-team testing is Feb 21, 2023 (next
Tuesday). We will monitor the status throughout the week and hopefully
conclude everything before the deadline.
- Please change the status icon to “smiling face” in the release wiki page
if the testing has been completed.
- We always look for volunteers: picking up a cross-team testing task is
much appreciated.

Release management:

- We’ll create a JIRA issue for release management tasks (such as creating
RCs, release notes, announcements), in order to improve review-ability.
(Thanks @ for the proposal!)

Test instabilities:

- FLINK-31036 was prioritized as a blocker and is being investigated now.

The next release meeting will be on Feb 21, 2023. Feel free to join us!

Google Meet: https://meet.google.com/wcx-fjbt-hhz
Dial-in: https://tel.meet/wcx-fjbt-hhz?pin=1940846765126

Best regards,
Leonard, Matthias, Martijn and Qingsheng


Re: Reusing the same OutputTag in multiple ProcessFunctions

2023-02-14 Thread David Anderson
I can't respond to the python-specific aspects of this situation, but
I don't believe you need to use the same OutputTag instance. It should
be enough that the various tag instances involved all have the same
String id. (That's why the id exists.)

David

On Tue, Feb 14, 2023 at 11:51 AM Andrew Otto  wrote:
>
> Hi,
>
> I'm attempting to implement a generic error handling ProcessFunction in 
> pyflink.  Given a user provided function, I want to invoke that function for 
> each element in the DataStream, catch any errors thrown by the function, 
> convert those errors into events, and then emit those event errors to a 
> different DataStream sink.
>
> I'm trying to do this by reusing the same OutputTag in each of my 
> ProcessFunctions.
> However, this does not work, I believe because I am using the same 
> error_output_tag in two different functions, which causes it to have a 
> reference(?)  to _thread.Rlock, which causes the ProcessFunction instance to 
> be un-pickleable.
>
> Here's a standalone example of the failure using the canonical word_count 
> example.
>
> My question is.
> 1. Does Flink support re-use of the same OutputTag instance in multiple 
> ProcessFunctions?
> 2. If so, is my problem pyflink / python / pickle specific?
>
> Thanks!
> -Andrew Otto
>  Wikimedia Foundation
>
>


Re: regular join每条流单独设置ttl

2023-02-14 Thread Jane Chan
你好,

目前 Flink SQL 还不支持为每条流单独设置 state TTL, 不过社区计划支持这个功能, 最近就会有 FLIP 提出, 也欢迎参与讨论.

Best regards,
Jane

On Wed, Feb 15, 2023 at 11:13 AM Jason_H  wrote:

> 大家好,
> 我遇到一个问题,在使用flinksql做双流join时,我用的是常规的regular
> join,分别是一条数据流,一条维表流,现在我想单独给数据流设置ttl,请教一下可以怎么做,flink版本是1.15.2
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


Re: DI in flink

2023-02-14 Thread Yashoda Krishna T
Thanks Austin.
I can make use of Rich functions to solve my problem.

Thanks
Yashoda

On Wed, Feb 15, 2023 at 12:42 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> (note: please keep user@flink.apache.org included in replies)
>
> Ah, I see. Then no, this is not provided by Flink. When I've used
> dependency inject with Flink in the past, I instantiated everything in the
> `open()` method of the Flink Rich* classes. Could you solve this by having
> a common base Sink class or builder that does the configuring? I'm just
> wondering why it's necessary to solve it in Flink itself.
>
> Best,
> Austin
>
> On Tue, Feb 14, 2023 at 11:05 AM Yashoda Krishna T <
> yashoda.kris...@unbxd.com> wrote:
>
>> This is my use case.
>> I have a sink function to push streaming data to S3. And I have a class
>> lets call S3ConnProvider that provides me a connection object to S3, and a
>> class lets say S3Util that has functions over S3 which injects
>> S3ConnProvider.
>> If dependency injection works I can inject S3Util alone in my
>> SinkFunction class. If not I have to initialize S3ConnProvider first and
>> then S3Util.
>> This can become complex if there are too many initializations
>> required depending on the use case.
>>
>>>


regular join每条流单独设置ttl

2023-02-14 Thread Jason_H
大家好,
我遇到一个问题,在使用flinksql做双流join时,我用的是常规的regular 
join,分别是一条数据流,一条维表流,现在我想单独给数据流设置ttl,请教一下可以怎么做,flink版本是1.15.2


| |
Jason_H
|
|
hyb_he...@163.com
|

Reusing the same OutputTag in multiple ProcessFunctions

2023-02-14 Thread Andrew Otto
Hi,

I'm attempting to implement a generic error handling ProcessFunction in
pyflink.  Given a user provided function, I want to invoke that function
for each element in the DataStream, catch any errors thrown by
the function, convert those errors into events, and then emit those event
errors to a different DataStream sink.

I'm trying to do this by reusing the same OutputTag in each of my
ProcessFunctions.
However, this does not work, I believe because I am using the same
error_output_tag in two different functions, which causes it to have a
reference(?)  to _thread.Rlock, which causes the ProcessFunction instance
to be un-pickleable.

Here's a standalone example
 of the
failure using the canonical word_count example.

My question is.
1. Does Flink support re-use of the same OutputTag instance in multiple
ProcessFunctions?
2. If so, is my problem pyflink / python / pickle specific?

Thanks!
-Andrew Otto
 Wikimedia Foundation


Re: DI in flink

2023-02-14 Thread Austin Cawley-Edwards
(note: please keep user@flink.apache.org included in replies)

Ah, I see. Then no, this is not provided by Flink. When I've used
dependency inject with Flink in the past, I instantiated everything in the
`open()` method of the Flink Rich* classes. Could you solve this by having
a common base Sink class or builder that does the configuring? I'm just
wondering why it's necessary to solve it in Flink itself.

Best,
Austin

On Tue, Feb 14, 2023 at 11:05 AM Yashoda Krishna T <
yashoda.kris...@unbxd.com> wrote:

> This is my use case.
> I have a sink function to push streaming data to S3. And I have a class
> lets call S3ConnProvider that provides me a connection object to S3, and a
> class lets say S3Util that has functions over S3 which injects
> S3ConnProvider.
> If dependency injection works I can inject S3Util alone in my SinkFunction
> class. If not I have to initialize S3ConnProvider first and then S3Util.
> This can become complex if there are too many initializations
> required depending on the use case.
>
>>


Re: DI in flink

2023-02-14 Thread Austin Cawley-Edwards
What would be the benefits and features over what can be done in user land?

On Tue, Feb 14, 2023 at 10:41 Yashoda Krishna T 
wrote:

> Hi Austin
>
> Yes this can be done in Usrr land.
> Can we do it in flink land too?
>
> Thanks
> Yashoda
>
> On Tue, 14 Feb 2023, 9:05 pm Austin Cawley-Edwards, <
> austin.caw...@gmail.com> wrote:
>
>> Hey Yashoda,
>>
>> This can be done in userland (eg with Dagger )
>> unless you're wanting Flink to do something in addition?
>>
>> Best,
>> Austin
>>
>> On Tue, Feb 14, 2023 at 10:01 AM Yashoda Krishna T <
>> yashoda.kris...@unbxd.com> wrote:
>>
>>> Does flink support dependency injection in flink task functions in java?
>>> If not is there an alternative?
>>>
>>


Re: DI in flink

2023-02-14 Thread Austin Cawley-Edwards
Hey Yashoda,

This can be done in userland (eg with Dagger ) unless
you're wanting Flink to do something in addition?

Best,
Austin

On Tue, Feb 14, 2023 at 10:01 AM Yashoda Krishna T <
yashoda.kris...@unbxd.com> wrote:

> Does flink support dependency injection in flink task functions in java?
> If not is there an alternative?
>


DI in flink

2023-02-14 Thread Yashoda Krishna T
Does flink support dependency injection in flink task functions in java? If
not is there an alternative?


Re: Watermark in global commit

2023-02-14 Thread Jan Lukavský

Hi,

I'm not expert on Flink specifially, but your approach might be easier 
solve when broken down into two steps - create a "stable" input to 
downstream processing, this might include a specific watermark. In 
Flink, the "stability" of input for downstream processing is ensured by 
a checkpoint. You would therefore need to wait for a checkpoint, 
buffering intermediate data in a state (and produce a particular 
watermark as a data element, because watermarks in general need not be 
'stable'). Once a checkpoint is completed, you would flush the buffer 
for downstream operators, one would create the parquet files, the other 
would do whatever action needs to be taken based on the watermark. The 
checkpoint ensures that the two tasks would be eventually consistent (if 
this is sufficient for your case).


In Apache Beam, we call this operation a transform that 
'@RequiresStableInput' [1], the implementation in Flink is as I 
described above.


 Jan

[1] 
https://beam.apache.org/releases/javadoc/2.44.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html


On 2/14/23 13:23, Tobias Fröhlich wrote:

Dear flink team,

I am facing the following problem: I would need to write events to parquet 
files using the FileSink. Subsequently, I want to do something else in a global 
commit where I need the corresponding watermark. However, the 
org.apache.flink.connector.file.sink.FileSink forces the type of the 
committables to be org.apache.flink.connector.file.sink.FileSinkCommittable 
which can not carry watermarks.

Details:

As far, as I understand the idea of a two-phase commit with a global committer, 
the committables are used for passing information from the writer to the global 
committer. This is done by implementing two methods in the writer and the 
committer, respectively:

  1. Collection 
TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns a 
collection of committables of some type CommT and
  2. void Committer::commit(Collection>) that uses this 
collection.

In general, the type CommT can be chosen arbitrarily. So, if the watermark is needed 
in the global commit, it is possible to use a customized object that contains a field 
for the watermark. However, if the class 
org.apache.flink.connector.file.sink.FileSink is used, the type for the 
committables is always org.apache.flink.connector.file.sink.FileSinkCommittable which 
does not have a field that can be used for the watermark.

The only solution I found, was by forking the flink source code and augmenting 
it in the following way:

   1. adding a field to FileSinkCommittable ("private long watermark;" with 
getter and setter)
   2. changing the FileSinkCommittableSerializer accordingly (this makes it 
necessary to define a new version)
   3. in fileWriter::prepareCommit() adding a loop over all committables to set 
the watermark


Am I missing something? Is there an easier way to get the watermarks from the 
writer to the global committer? If not, is it justified to propose a feature 
request?

Best regards and thanks in advance
Tobias Fröhlich



Metrics or runtimeContext in global commit

2023-02-14 Thread Tobias Fröhlich
Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the 
method 
org.apache.flink.api.connector.sink2.Committer::commit(Collection>
 committables) that I use for global commit. I use the helper method 
StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit 
topology.

The problem is: When I implement the interface Committer, I cannot get the 
runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it 
in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext" that 
implements Committer and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and 
thus has a runtimeContext) adding the following lines at the end:

if (committer instanceof  CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext) 
committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext) committer).init();
}

I can then implement the method CommitterWithRuntimeContext::init() in our code 
and call the method CommitterWithRuntimeContext::getRuntimeContext() when I 
need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is 
it justified to propose a feature request for a future release, where the 
global commit method can be implemented in a way that the user has access to 
the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich


Watermark in global commit

2023-02-14 Thread Tobias Fröhlich
Dear flink team,

I am facing the following problem: I would need to write events to parquet 
files using the FileSink. Subsequently, I want to do something else in a global 
commit where I need the corresponding watermark. However, the 
org.apache.flink.connector.file.sink.FileSink forces the type of the 
committables to be org.apache.flink.connector.file.sink.FileSinkCommittable 
which can not carry watermarks.

Details:

As far, as I understand the idea of a two-phase commit with a global committer, 
the committables are used for passing information from the writer to the global 
committer. This is done by implementing two methods in the writer and the 
committer, respectively:

 1. Collection 
TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns a 
collection of committables of some type CommT and 
 2. void Committer::commit(Collection>) that uses this 
collection.

In general, the type CommT can be chosen arbitrarily. So, if the watermark is 
needed in the global commit, it is possible to use a customized object that 
contains a field for the watermark. However, if the class 
org.apache.flink.connector.file.sink.FileSink is used, the type for the 
committables is always org.apache.flink.connector.file.sink.FileSinkCommittable 
which does not have a field that can be used for the watermark.

The only solution I found, was by forking the flink source code and augmenting 
it in the following way:

  1. adding a field to FileSinkCommittable ("private long watermark;" with 
getter and setter)
  2. changing the FileSinkCommittableSerializer accordingly (this makes it 
necessary to define a new version)
  3. in fileWriter::prepareCommit() adding a loop over all committables to set 
the watermark


Am I missing something? Is there an easier way to get the watermarks from the 
writer to the global committer? If not, is it justified to propose a feature 
request?

Best regards and thanks in advance
Tobias Fröhlich



Re: 运行中的作业状态清除操作

2023-02-14 Thread Shammon FY
Hi

这个是无法办到的,建议还是重启

On Mon, Feb 13, 2023 at 4:33 PM Jason_H  wrote:

> 遇到的问题:在flink中,使用状态的地方已经设置了ttl, 但是上下游把数据清空了,然后想重新测试发现,
> flink计算的结果不是从0开始累计的,是基于之前的结果继续累计的,这种情况在不重启作业的情况下 有什么办法处理吗?
>
>
> 具体来说就是,在不停止flink作业的情况下,怎么清楚运行中的flink作业的状态数据,以达到计算结果归0开始累计。
>
>
> 在不重启作业的情况下,清空状态数据是不是和重启作业运行是一样的效果,避免状态累计数据对结果产生影响呢。
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |