Re: subscribe

2023-03-15 Thread Hang Ruan
Please send an e-mail to user-subscr...@flink.apache.org to subscribe to
the flink user mail list.

Best,
Hang

mark  于2023年3月15日周三 22:07写道:

> subscribe
>


[ANNOUNCE] Apache Flink 1.15.4 released

2023-03-15 Thread Danny Cranmer
The Apache Flink community is very happy to announce the release of Apache
Flink 1.15.4, which is the fourth bugfix release for the Apache Flink 1.15
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/2023/03/15/apache-flink-1.15.4-release-announcement/

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352526

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Feel free to reach out to the release managers (or respond to this thread)
with feedback on the release process. Our goal is to constantly improve the
release process. Feedback on what could be improved or things that didn't
go so well are appreciated.

Regards,
Danny


Re: Kafka sql with validation exception

2023-03-15 Thread Lasse Nedergaard
I got it to work Thanks for pointing me in the right direction. I had some flink dependence that wasn’t set to provided and I removed sql-connector-Kafka and that seems to fix the problem. Thanks once again Med venlig hilsen / Best regardsLasse NedergaardDen 15. mar. 2023 kl. 15.21 skrev Lasse Nedergaard :Hi. Thanks Shammon. You are right org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory are not in the file. I’m use the shade plugin as described and the only difference from my other project are the nested project structure. I have “my project”/Flink/“my flink project”/src/main/Java/…So if you have any ideas why it isn’t shared correct it will help a lotMed venlig hilsen / Best regardsLasse NedergaardDen 15. mar. 2023 kl. 13.43 skrev Hang Ruan :Hi, Lasse,I think you should make sure the situation as Shammon said.Maybe you need to use the maven-shade-plugin like this to package, and make sure files in `META-INF/services` are  merged together.
  

  org.apache.maven.pluginsgroupId>
  maven-shade-pluginartifactId>
  3.2.4version>
  

  packagephase>
  
shadegoal>
  goals>
  

  
transformers>
  configuration>
execution>
  executions>
plugin>
  plugins>
build>Best,Hang Shammon FY  于2023年3月15日周三 19:21写道:Hi LasseI think you can first check whether there is a file `META-INF/services/org.apache.flink.table.factories.Factory` in your uber jar and there's `org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory` in the file. Flink would like to create table factory from that file.And then you can check whether your uber jar are in the classpath of flink clusterBest,Shammon FYOn Wed, Mar 15, 2023 at 6:35 PM Lasse Nedergaard  wrote:Hi. 

I have a simple job creating a table from Kafka. It works perfect on my local machine but when I build a Uber jar and use the official Flink image I get a validation exception. 

Could not find any factory for identifier ‘Kafka’ that implements org.Apache.Flink.table.dynamicTableFactory in the class path. 

The uber jar contains Flink-connector-kafka and Flink-sql-connector-Kafka. 

I can see on my local machine it calls discovery factory in flink-table-common but on my cluster it use flink-table-api-java-Uber. 

And the list of available identities doesn’t contain ‘Kafka’ and upsert-Kafka as It does on my local machine. 🤔

Anyone has a clue where I should look for the problem?

Med venlig hilsen / Best regards
Lasse Nedergaard





Re: IntervalJoin invisibly becomes a regular Join - why?

2023-03-15 Thread Leonard Xu
> 
> CREATE TEMPORARY VIEW filteredResults AS
> SELECT * from suspiciousOrders WHERE small_ts > large_ts;

Looks like after added the condition, the final expanded query should not match 
the condition[1] of an interval join that leads to the planner recognize it as 
an interval join. It’s not a bug, interval join is a special case of regular 
join, thus the result would be still correct.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/operators/joining/#interval-join
 

Re: Kafka sql with validation exception

2023-03-15 Thread Lasse Nedergaard
Hi. Thanks Shammon. You are right org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory are not in the file. I’m use the shade plugin as described and the only difference from my other project are the nested project structure. I have “my project”/Flink/“my flink project”/src/main/Java/…So if you have any ideas why it isn’t shared correct it will help a lotMed venlig hilsen / Best regardsLasse NedergaardDen 15. mar. 2023 kl. 13.43 skrev Hang Ruan :Hi, Lasse,I think you should make sure the situation as Shammon said.Maybe you need to use the maven-shade-plugin like this to package, and make sure files in `META-INF/services` are  merged together.
  

  org.apache.maven.pluginsgroupId>
  maven-shade-pluginartifactId>
  3.2.4version>
  

  packagephase>
  
shadegoal>
  goals>
  

  
transformers>
  configuration>
execution>
  executions>
plugin>
  plugins>
build>Best,Hang Shammon FY  于2023年3月15日周三 19:21写道:Hi LasseI think you can first check whether there is a file `META-INF/services/org.apache.flink.table.factories.Factory` in your uber jar and there's `org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory` in the file. Flink would like to create table factory from that file.And then you can check whether your uber jar are in the classpath of flink clusterBest,Shammon FYOn Wed, Mar 15, 2023 at 6:35 PM Lasse Nedergaard  wrote:Hi. 

I have a simple job creating a table from Kafka. It works perfect on my local machine but when I build a Uber jar and use the official Flink image I get a validation exception. 

Could not find any factory for identifier ‘Kafka’ that implements org.Apache.Flink.table.dynamicTableFactory in the class path. 

The uber jar contains Flink-connector-kafka and Flink-sql-connector-Kafka. 

I can see on my local machine it calls discovery factory in flink-table-common but on my cluster it use flink-table-api-java-Uber. 

And the list of available identities doesn’t contain ‘Kafka’ and upsert-Kafka as It does on my local machine. 🤔

Anyone has a clue where I should look for the problem?

Med venlig hilsen / Best regards
Lasse Nedergaard





IntervalJoin invisibly becomes a regular Join - why?

2023-03-15 Thread mark
Hello,
I'm seeing some strange behaviour in Flink SQL where adding a new SELECT
statement causes a previously created Interval Join to be changed into a
regular Join. I'm concerned because the Flink docs make clear that regular
Joins are not safe because their memory usage can grow indefinitely.

I have put a worked example in https://github.com/mnuttall/flink-debug. I
have an interval join,

CREATE TEMPORARY VIEW suspiciousOrders AS
SELECT s.orderId, s.customer, s.product, s.quantity AS order_quantity,
l.cancel_quantity, l.order_ts AS large_ts, s.ts as small_ts, l.cancel_ts
FROM smallOrders s JOIN largeCancellations l
ON s.product = l.product AND s.customer = l.customer
WHERE s.ts BETWEEN l.cancel_ts - interval '1' day AND l.cancel_ts;

which evaluates to

[13]:IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-8640, leftUpperBound=0, leftTimeIndex=0,
rightTimeIndex=1], where=[((product = product0) AND (customer = customer0)
AND (ts >= (cancel_ts - 8640:INTERVAL DAY)) AND (ts <=
cancel_ts))], select=[ts, orderId, customer, product, quantity, order_ts,
cancel_ts, product0, customer0, cancel_quantity])
+- [14]:Calc(select=[orderId, customer, product, quantity AS
order_quantity, cancel_quantity, order_ts AS large_ts, ts AS small_ts,
cancel_ts])
   +- [15]:ConstraintEnforcer[NotNullEnforcer(fields=[order_quantity,
cancel_quantity])]
  +- Sink: Collect table sink

but adding a further temporary view

CREATE TEMPORARY VIEW filteredResults AS
SELECT * from suspiciousOrders WHERE small_ts > large_ts;

changes the interval join to a regular join,

 [13]:Join(joinType=[InnerJoin], where=[((product = product0) AND (customer
= customer0) AND (ts >= (cancel_ts - 8640:INTERVAL DAY)) AND (ts
<= cancel_ts) AND (ts > order_ts))], select=[ts, orderId, customer,
product, quantity, order_ts, cancel_ts, product0, customer0,
cancel_quantity], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+- [14]:Calc(select=[orderId, customer, product, quantity AS
order_quantity, cancel_quantity, order_ts AS large_ts, ts AS small_ts,
cancel_ts])
   +- [15]:ConstraintEnforcer[NotNullEnforcer(fields=[order_quantity,
cancel_quantity])]
  +- Sink: Collect table sink

Please can someone explain what's happening here? It looks as though my
(safe) interval join is being converted to an (unsafe) regular join - is
that true?

Many thanks in advance.
Regards,

Mark Nuttall


subscribe

2023-03-15 Thread mark
subscribe


Re: Kafka sql with validation exception

2023-03-15 Thread Hang Ruan
Hi, Lasse,

I think you should make sure the situation as Shammon said.

Maybe you need to use the maven-shade-plugin like this to package, and make
sure files in `META-INF/services` are  merged together.

   org.apache.maven.plugins <
> artifactId>maven-shade-plugin 3.2.4 <
> executions>  package  shade
> "org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>  transformers>  plugins> 


Best,
Hang

Shammon FY  于2023年3月15日周三 19:21写道:

> Hi Lasse
>
> I think you can first check whether there is a file
> `META-INF/services/org.apache.flink.table.factories.Factory` in your uber
> jar and there's
> `org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory`
> in the file. Flink would like to create table factory from that file.
> And then you can check whether your uber jar are in the classpath of flink
> cluster
>
> Best,
> Shammon FY
>
>
> On Wed, Mar 15, 2023 at 6:35 PM Lasse Nedergaard <
> lassenedergaardfl...@gmail.com> wrote:
>
>> Hi.
>>
>> I have a simple job creating a table from Kafka. It works perfect on my
>> local machine but when I build a Uber jar and use the official Flink image
>> I get a validation exception.
>>
>> Could not find any factory for identifier ‘Kafka’ that implements
>> org.Apache.Flink.table.dynamicTableFactory in the class path.
>>
>> The uber jar contains Flink-connector-kafka and
>> Flink-sql-connector-Kafka.
>>
>> I can see on my local machine it calls discovery factory in
>> flink-table-common but on my cluster it use flink-table-api-java-Uber.
>>
>> And the list of available identities doesn’t contain ‘Kafka’ and
>> upsert-Kafka as It does on my local machine. 🤔
>>
>> Anyone has a clue where I should look for the problem?
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>


Re: Are the Table API Connectors production ready?

2023-03-15 Thread ravi_suryavanshi.yahoo.com via user
Thank you All.  

On Tuesday, 14 March, 2023 at 07:14:05 am IST, yuxia 
 wrote:  
 
 The plan shows the filters has been pushed down. But remeber, although pused 
down, the filesystem table won't accept the filter. So, it'll be still like 
scan 
all files.

Best regards,
Yuxia
发件人: "Maryam Moafimadani" 
收件人: "Hang Ruan" 
抄送: "yuxia" , "ravi suryavanshi" 
, "Yaroslav Tkachenko" , 
"Shammon FY" , "User" 
发送时间: 星期一, 2023年 3 月 13日 下午 10:07:57
主题: Re: Are the Table API Connectors production ready?

Hi All,It's exciting to see file filtering in the plan for development. I am 
curious whether the following query on a filesystem connector would actually 
push down the filter on metadata `file.path`?

Select score, `file.path` from MyUserTable WHERE `file.path` LIKE '%prefix_%' 

== Optimized Execution Plan ==
Calc(select=[score, file.path], where=[LIKE(file.path, '%2022070611284%')])
+- TableSourceScan(table=[[default_catalog, default_database, MyUserTable, 
filter=[LIKE(file.path, _UTF-16LE'%2022070611284%')]]], fields=[score, 
file.path])

Thanks,Maryam
On Mon, Mar 13, 2023 at 8:55 AM Hang Ruan  wrote:

Hi, yuxia,I would like to help to complete this task.
Best,Hang
yuxia  于2023年3月13日周一 09:32写道:

Yeah, you're right. We don't provide filtering files with patterns. And 
actually we had already a jira[1] for it.
I was intended to do this in the past, but don't have much time.  Anyone who 
are insterested can take it over. We're 
happy to help review.

[1] https://issues.apache.org/jira/browse/FLINK-17398

Best regards,
Yuxia
发件人: "User" 
收件人: "Yaroslav Tkachenko" , "Shammon FY" 

抄送: "User" 
发送时间: 星期一, 2023年 3 月 13日 上午 12:36:46
主题: Re: Are the Table API Connectors production ready?

 Thanks a lot, Yaroslav and Shammon.I want to use the Filesystem Connector.  I 
tried it works well till it is running. If the job is restarted. It processes 
all the files again.
Could not find the move or delete option after collecting the files. Also, I 
could not find the filtering using patterns.
Pattern matching is required as different files exist in the same folder.
Regards,RaviOn Friday, 10 March, 2023 at 05:47:27 am IST, Shammon FY 
 wrote:  
 
 Hi Ravi
Agree with Yaroslav and if you find any problems in use, you can create an 
issue in jira https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK 
. I have used kafka/jdbc/hive in production too, they work well.
Best,Shammon
On Fri, Mar 10, 2023 at 1:42 AM Yaroslav Tkachenko  wrote:

Hi Ravi,
All of them should be production ready. I've personally used half of them in 
production.
Do you have any specific concerns? 
On Thu, Mar 9, 2023 at 9:39 AM ravi_suryavanshi.yahoo.com via user 
 wrote:

 Hi,Can anyone help me here?
Thanks and regards,Ravi
On Monday, 27 February, 2023 at 09:33:18 am IST, ravi_suryavanshi.yahoo.com 
via user  wrote:  
 
 Hi Team,

In Flink 1.16.0, we would like to use some of the Table API Connectors for 
production. Kindly let me know if the below connectors are production ready or 
only for testing purposes.

| Name | Version | Source | Sink |
| Filesystem |  | Bounded and Unbounded Scan, Lookup | Streaming Sink, Batch 
Sink |
| Elasticsearch | 6.x & 7.x | Not supported | Streaming Sink, Batch Sink |
| Opensearch | 1.x & 2.x | Not supported | Streaming Sink, Batch Sink |
| Apache Kafka | 0.10+ | Unbounded Scan | Streaming Sink, Batch Sink |
| Amazon DynamoDB |  | Not supported | Streaming Sink, Batch Sink |
| Amazon Kinesis Data Streams |  | Unbounded Scan | Streaming Sink |
| Amazon Kinesis Data Firehose |  | Not supported | Streaming Sink |
| JDBC |  | Bounded Scan, Lookup | Streaming Sink, Batch Sink |
| Apache HBase | 1.4.x & 2.2.x | Bounded Scan, Lookup | Streaming Sink, Batch 
Sink |
| Apache Hive |


Thanks and regards
  

  




-- 
Maryam MoafimadaniSenior Data Developer @Shopify
  

Re: Kafka sql with validation exception

2023-03-15 Thread Shammon FY
Hi Lasse

I think you can first check whether there is a file
`META-INF/services/org.apache.flink.table.factories.Factory` in your uber
jar and there's
`org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory`
in the file. Flink would like to create table factory from that file.
And then you can check whether your uber jar are in the classpath of flink
cluster

Best,
Shammon FY


On Wed, Mar 15, 2023 at 6:35 PM Lasse Nedergaard <
lassenedergaardfl...@gmail.com> wrote:

> Hi.
>
> I have a simple job creating a table from Kafka. It works perfect on my
> local machine but when I build a Uber jar and use the official Flink image
> I get a validation exception.
>
> Could not find any factory for identifier ‘Kafka’ that implements
> org.Apache.Flink.table.dynamicTableFactory in the class path.
>
> The uber jar contains Flink-connector-kafka and Flink-sql-connector-Kafka.
>
> I can see on my local machine it calls discovery factory in
> flink-table-common but on my cluster it use flink-table-api-java-Uber.
>
> And the list of available identities doesn’t contain ‘Kafka’ and
> upsert-Kafka as It does on my local machine. 🤔
>
> Anyone has a clue where I should look for the problem?
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>


Kafka sql with validation exception

2023-03-15 Thread Lasse Nedergaard
Hi. 

I have a simple job creating a table from Kafka. It works perfect on my local 
machine but when I build a Uber jar and use the official Flink image I get a 
validation exception. 

Could not find any factory for identifier ‘Kafka’ that implements 
org.Apache.Flink.table.dynamicTableFactory in the class path. 

The uber jar contains Flink-connector-kafka and Flink-sql-connector-Kafka. 

I can see on my local machine it calls discovery factory in flink-table-common 
but on my cluster it use flink-table-api-java-Uber. 

And the list of available identities doesn’t contain ‘Kafka’ and upsert-Kafka 
as It does on my local machine. 🤔

Anyone has a clue where I should look for the problem?

Med venlig hilsen / Best regards
Lasse Nedergaard



Re: WELCOME to user@flink.apache.org

2023-03-15 Thread Martijn Visser
Hi Penny,

When you complete step 1 and step 2, it means that you have subscribed to
the User mailing list so you can post the email that you want to send to
the User mailing list by performing step 3. I can see why the email can be
confusing though.

Best regards,

Martijn

On Sat, Mar 11, 2023 at 11:42 AM Penny Rastogi 
wrote:

> Hi There,
> As per the guidelines:
>
> If you’d like to post to the mailing list, you need to
>
>1. subscribe to the mailing list by sending an email to
>user-subscr...@flink.apache.org,
>2. confirm the subscription by replying to the confirmation email, and
>3. send your email to user@flink.apache.org
>
>
> I am sending the confirmation mail to user@flink.apache.org
>
> Please acknowledge.
>
> Regards,
> Vallari
>
> -- Forwarded message -
> From: 
> Date: Sat, Mar 11, 2023 at 4:08 PM
> Subject: WELCOME to user@flink.apache.org
> To: 
>
>
> Hi! This is the ezmlm program. I'm managing the
> user@flink.apache.org mailing list.
>
> Acknowledgment: I have added the address
>
>walls.fl...@gmail.com
>
> to the user mailing list.
>
> Welcome to user@flink.apache.org!
>
> Please save this message so that you know the address you are
> subscribed under, in case you later want to unsubscribe or change your
> subscription address.
>
>
> --- Administrative commands for the user list ---
>
> I can handle administrative requests automatically. Please
> do not send them to the list address! Instead, send
> your message to the correct command address:
>
> To subscribe to the list, send a message to:
>
>
> To remove your address from the list, send a message to:
>
>
> Send mail to the following for info and FAQ for this list:
>
>
>
> Similar addresses exist for the digest list:
>
>
>
> To get messages 123 through 145 (a maximum of 100 per request), mail:
>
>
> To get an index with subject and author for messages 123-456 , mail:
>
>
> They are always returned as sets of 100, max 2000 per request,
> so you'll actually get 100-499.
>
> To receive all messages with the same subject as message 12345,
> send a short message to:
>
>
> The messages should contain one line or word of text to avoid being
> treated as sp@m, but I will ignore their content.
> Only the ADDRESS you send to is important.
>
> You can start a subscription for an alternate address,
> for example "john@host.domain", just add a hyphen and your
> address (with '=' instead of '@') after the command word:
> 
>
> To stop subscription for this address, mail:
> 
>
> In both cases, I'll send a confirmation message to that address. When
> you receive it, simply reply to it to complete your subscription.
>
> If despite following these instructions, you do not get the
> desired results, please contact my owner at
> user-ow...@flink.apache.org. Please be patient, my owner is a
> lot slower than I am ;-)
>
> --- Enclosed is a copy of the request I received.
>
> Return-Path: 
> Received: (qmail 3560623 invoked by uid 116); 11 Mar 2023 10:38:15 -
> Received: from spamproc1-he-fi.apache.org (HELO spamproc1-he-fi.apache.org)
> (95.217.134.168)
>  by apache.org (qpsmtpd/0.94) with ESMTP; Sat, 11 Mar 2023 10:38:15 +
> Authentication-Results: apache.org; auth=none
> Received: from localhost (localhost [127.0.0.1])
> by spamproc1-he-fi.apache.org (ASF Mail Server at
> spamproc1-he-fi.apache.org) with ESMTP id C5D06C06DB
> for  gmail@flink.apache.org>; Sat, 11 Mar 2023 10:38:14 + (UTC)
> X-Virus-Scanned: Debian amavisd-new at spamproc1-he-fi.apache.org
> X-Spam-Flag: NO
> X-Spam-Score: 0
> X-Spam-Level:
> X-Spam-Status: No, score=0 tagged_above=-999 required=6.31
> tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
> DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, SPF_PASS=-0.001,
> URIBL_BLOCKED=0.001] autolearn=disabled
> Authentication-Results: spamproc1-he-fi.apache.org (amavisd-new);
> dkim=pass (2048-bit key) header.d=gmail.com
> Received: from mx1-he-de.apache.org ([116.203.227.195])
> by localhost (spamproc1-he-fi.apache.org [95.217.134.168])
> (amavisd-new, port 10024)
> with ESMTP id Ef2eCQzSFakt
> for  gmail@flink.apache.org>;
> Sat, 11 Mar 2023 10:38:13 + (UTC)
> Received-SPF: Pass (mailfrom) identity=mailfrom;
> client-ip=2607:f8b0:4864:20::929; helo=mail-ua1-x929.google.com;
> envelope-from=walls.fl...@gmail.com; receiver=
> Received: from mail-ua1-x929.google.com (mail-ua1-x929.google.com
> [IPv6:2607:f8b0:4864:20::929])
> by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org)
> with ESMTPS id 820C27D788
> for  gmail@flink.apache.org>; Sat, 11 Mar 2023 10:38:13 + (UTC)
> Received: by mail-ua1-x929.google.com with SMTP id v48so5207308uad.6
> for  gmail@flink.apache.org>; Sat, 11 Mar 2023 02:38:13 -0800 (PST)
> DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
> d=gmail.com; s=20210112; t=1678531092;
>
> h=to

Re: Watermarks lagging behind events that generate them

2023-03-15 Thread Shammon FY
Hi Alexis

Currently I think checkpoint and savepoint will not save watermarks. I
think how to deal with watermarks at checkpoint/savepoint is a good
question, we can discuss this in dev mail list

Best,
Shammon FY


On Wed, Mar 15, 2023 at 4:22 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi Shammon, thanks for the info. I was hoping the savepoint would include
> the watermark, but I'm not sure that would make sense in every scenario.
>
> Regards,
> Alexis.
>
> Am Di., 14. März 2023 um 12:59 Uhr schrieb Shammon FY :
>
>> Hi Alexis
>>
>> In some watermark generators such as BoundedOutOfOrderTimestamps,
>> the timestamp of watermark will be reset to Long.MIN_VALUE if the subtask
>> is restarted and no event from source is processed.
>>
>> Best,
>> Shammon FY
>>
>> On Tue, Mar 14, 2023 at 4:58 PM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hi David, thanks for the answer. One follow-up question: will the
>>> watermark be reset to Long.MIN_VALUE every time I restart a job with
>>> savepoint?
>>>
>>> Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson <
>>> dander...@apache.org>:
>>>
 Watermarks always follow the corresponding event(s). I'm not sure why
 they were designed that way, but that is how they are implemented.
 Windows maintain this contract by emitting all of their results before
 forwarding the watermark that triggered the results.

 David

 On Mon, Mar 13, 2023 at 5:28 PM Shammon FY  wrote:
 >
 > Hi Alexis
 >
 > Do you use both event-time watermark generator and TimerService for
 processing time in your job? Maybe you can try using event-time watermark
 first.
 >
 > Best,
 > Shammon.FY
 >
 > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa <
 sarda.espin...@gmail.com> wrote:
 >>
 >> Hello,
 >>
 >> I recently ran into a weird issue with a streaming job in Flink
 1.16.1. One of my functions (KeyedProcessFunction) has been using
 processing time timers. I now want to execute the same job based on a
 historical data dump, so I had to adjust the logic to use event time timers
 in that case (and did not use BATCH execution mode). Since my data has a
 timestamp field, I implemented a custom WatermarkGenerator that always
 emits a watermark with that timestamp in the onEvent callback, and does
 nothing in the onPeriodicEmit callback.
 >>
 >> My problem is that, sometimes, the very first time my function calls
 TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
 some false triggers when the first watermark actually arrives.
 >>
 >> I would have expected that, if WatermarkGenerator.onEvent emits a
 watermark, it would be sent before the corresponding event, but maybe this
 is not always the case?
 >>
 >> In case it's relevant, a brief overview of my job's topology:
 >>
 >> Source1 -> Broadcast
 >>
 >> Source2 ->
 >>   keyBy ->
 >>   connect(Broadcast) ->
 >>   process ->
 >>   filter ->
 >>   assignTimestampsAndWatermarks -> // newly added for historical data
 >>   keyBy ->
 >>   process // function that uses timers
 >>
 >> Regards,
 >> Alexis.

>>>


Re: is there any detrimental side-effect if i set the max parallelismas 32768

2023-03-15 Thread Leonard Xu

> 
> 退订
请发送任意邮件到 user-unsubscr...@flink.apache.org 取消 订阅来自 user@flink.apache.org  
邮件列表的邮件,发送到 user@flink.apache.org 是不会取消订阅的。


> 发自我的iPhone
> 
> 
> -- Original --
> From: Tony Wei 
> Date: Tue,Mar 14,2023 1:11 PM
> To: David Anderson 
> Cc: Hangxiang Yu , user 
> Subject: Re: is there any detrimental side-effect if i set the max 
> parallelismas 32768
> 
> Hi Hangxiang, David,
> 
> Thank you for your replies. Your responses are very helpful.
> 
> Best regards,
> Tony Wei
> 
> David Anderson mailto:dander...@apache.org>> 於 
> 2023年3月14日 週二 下午12:12寫道:
> I believe there is some noticeable overhead if you are using the
> heap-based state backend, but with RocksDB I think the difference is
> negligible.
> 
> David
> 
> On Tue, Mar 7, 2023 at 11:10 PM Hangxiang Yu  > wrote:
> >
> > Hi, Tony.
> > "be detrimental to performance" means that some extra space overhead of the 
> > field of the key-group may influence performance.
> > As we know, Flink will write the key group as the prefix of the key to 
> > speed up rescaling.
> > So the format will be like: key group | key len | key | ..
> > You could check the relationship between max parallelism and bytes of key 
> > group as below:
> > --
> > max parallelism   bytes of key group
> >1281
> >   32768 2
> > --
> > So I think the cost will be very small if the real key length >> 2 bytes.
> >
> > On Wed, Mar 8, 2023 at 1:06 PM Tony Wei  > > wrote:
> >>
> >> Hi experts,
> >>
> >>> Setting the maximum parallelism to a very large value can be detrimental 
> >>> to performance because some state backends have to keep internal data 
> >>> structures that scale with the number of key-groups (which are the 
> >>> internal implementation mechanism for rescalable state).
> >>>
> >>> Changing the maximum parallelism explicitly when recovery from original 
> >>> job will lead to state incompatibility.
> >>
> >>
> >> I read the section above from Flink official document [1], and I'm 
> >> wondering what the detail is regarding to the side-effect.
> >>
> >> Suppose that I have a Flink SQL job with large state, large parallelism 
> >> and using RocksDB as my state backend.
> >> I would like to set the max parallelism as 32768, so that I don't bother 
> >> if the max parallelism can be divided by the parallelism whenever I want 
> >> to scale my job,
> >> because the number of key groups will not differ too much between each 
> >> subtask.
> >>
> >> I'm wondering if this is a good practice, because based on the official 
> >> document it is not recommended actually.
> >> If possible, I would like to know the detail about this side-effect. Which 
> >> state backend will have this issue? and Why?
> >> Please give me an advice. Thanks in advance.
> >>
> >> Best regards,
> >> Tony Wei
> >>
> >> [1] 
> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism
> >>  
> >> 
> >
> >
> >
> > --
> > Best,
> > Hangxiang.



Re: Watermarks lagging behind events that generate them

2023-03-15 Thread Alexis Sarda-Espinosa
Hi Shammon, thanks for the info. I was hoping the savepoint would include
the watermark, but I'm not sure that would make sense in every scenario.

Regards,
Alexis.

Am Di., 14. März 2023 um 12:59 Uhr schrieb Shammon FY :

> Hi Alexis
>
> In some watermark generators such as BoundedOutOfOrderTimestamps,
> the timestamp of watermark will be reset to Long.MIN_VALUE if the subtask
> is restarted and no event from source is processed.
>
> Best,
> Shammon FY
>
> On Tue, Mar 14, 2023 at 4:58 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi David, thanks for the answer. One follow-up question: will the
>> watermark be reset to Long.MIN_VALUE every time I restart a job with
>> savepoint?
>>
>> Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson <
>> dander...@apache.org>:
>>
>>> Watermarks always follow the corresponding event(s). I'm not sure why
>>> they were designed that way, but that is how they are implemented.
>>> Windows maintain this contract by emitting all of their results before
>>> forwarding the watermark that triggered the results.
>>>
>>> David
>>>
>>> On Mon, Mar 13, 2023 at 5:28 PM Shammon FY  wrote:
>>> >
>>> > Hi Alexis
>>> >
>>> > Do you use both event-time watermark generator and TimerService for
>>> processing time in your job? Maybe you can try using event-time watermark
>>> first.
>>> >
>>> > Best,
>>> > Shammon.FY
>>> >
>>> > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>> >>
>>> >> Hello,
>>> >>
>>> >> I recently ran into a weird issue with a streaming job in Flink
>>> 1.16.1. One of my functions (KeyedProcessFunction) has been using
>>> processing time timers. I now want to execute the same job based on a
>>> historical data dump, so I had to adjust the logic to use event time timers
>>> in that case (and did not use BATCH execution mode). Since my data has a
>>> timestamp field, I implemented a custom WatermarkGenerator that always
>>> emits a watermark with that timestamp in the onEvent callback, and does
>>> nothing in the onPeriodicEmit callback.
>>> >>
>>> >> My problem is that, sometimes, the very first time my function calls
>>> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
>>> some false triggers when the first watermark actually arrives.
>>> >>
>>> >> I would have expected that, if WatermarkGenerator.onEvent emits a
>>> watermark, it would be sent before the corresponding event, but maybe this
>>> is not always the case?
>>> >>
>>> >> In case it's relevant, a brief overview of my job's topology:
>>> >>
>>> >> Source1 -> Broadcast
>>> >>
>>> >> Source2 ->
>>> >>   keyBy ->
>>> >>   connect(Broadcast) ->
>>> >>   process ->
>>> >>   filter ->
>>> >>   assignTimestampsAndWatermarks -> // newly added for historical data
>>> >>   keyBy ->
>>> >>   process // function that uses timers
>>> >>
>>> >> Regards,
>>> >> Alexis.
>>>
>>