Re: How to prevent check pointing of timers ?

2022-02-07 Thread Frank Dekervel

Hello,

I guess you already evaluated moving to event time and you were not able 
? Because this really seems to be a case for moving to event time 
timers. I think that would require some effort (including choosing a 
good watermark strategy) but then would solve all your problems.


Frank

On 08.02.22 08:42, Alex Drobinsky wrote:

Sure :) The problem could be defined as the following:
Imagine you have a stream of data , for example, network traffic.
This network traffic is keyed by source address / source port / 
destination address / destination port / protocol type.

Every connection could be "completed" in two ways :
1) we encountered packet that indicates end of connection according to 
protocol
2) we did not received any packet for that connection during last 60 
seconds


In the second case, function onTimer called by Flink and session are 
closed.
However, if a crash happens and checkpoint is restored, onTimer being 
called immediately and session has been closed prematurely.
Now, I would like to prevent this from happening - so I have two 
solutions - first solution is a workaround you already have seen in a 
previous email e.g. first time onTimer has been triggered, it ignores 
call and resets timer.
Second solution is rather hypothetical e.g. somehow forcing the timer 
to be volatile or reset timer after restore , so the question is if 
this second solution is feasible ?


вт, 8 февр. 2022 г. в 04:19, Yun Tang :

Hi Alex,

I think the better solution is to know what the problem you have
ever met when restoring the timers?

Flink does not support to remove state (including timer state)
currently.

Best
Yun Tang

*From:* Alex Drobinsky 
*Sent:* Monday, February 7, 2022 21:09
*To:* Caizhi Weng 
*Cc:* User-Flink 
*Subject:* Re: How to prevent check pointing of timers ?
By timer I mean regular timer from KeyedState which utilized via
function onTimer, for example:


public class StateWithTimer {
 public long timerValue =0;
 public volatile boolean shouldResetTimer =true;

 public boolean resetIfMust(long timeoutInMilliseconds,TimerService 
timerService) {
 if (shouldResetTimer) {
 setupTimer(timeoutInMilliseconds, timerService);
 shouldResetTimer =false;
 return true;
 }
 return false;
 }

 public void setupTimer(long timeoutInMilliseconds,TimerService 
timerService) {
 // Cancel previous timer 
timerService.deleteProcessingTimeTimer(timerValue);
 // Register new timer // Should it be configurable ? timerValue = 
(timerService.currentProcessingTime() + timeoutInMilliseconds)*1000/1000;
 timerService.registerProcessingTimeTimer(timerValue);
 }

}


State which utilizes timers extends StateWithTimer above, the
function resetIfMust is current workaround - it resets timers
first time after restart from checkpoint or start.

@Override public void onTimer(long timestamp,OnTimerContext 
ctx,Collector collector) throws Exception {
MultiStorePacketState so =state.value();
if 
(so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout, 
ctx.timerService())) {
   return;
}
closeAndReportFile(collector,so);

ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
state.update(so);
}


пн, 7 февр. 2022 г. в 05:06, Caizhi Weng :

Hi!

Could you elaborate more on your code or share it if possible?
Which timer are you talking about? Are you using the data
stream API or SQL API? Do you mean the timer registered per
record for a window aggregation? Does mini batch aggregation
[1] solve your problem?

[1]

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation

Alex Drobinsky  于2022年2月3日周四
20:41写道:

Dear flink user,

In our project, restoring the timer's state creates
numerous issues, so I would like to know
if it is possible to avoid save/restore of timers altogether.
If it isn't possible, how could I delete all registered
timers during the open function ?

Best regards,
Alexander


Re: How to prevent check pointing of timers ?

2022-02-07 Thread Alex Drobinsky
Sure :) The problem could be defined as the following:
Imagine you have a stream of data , for example, network traffic.
This network traffic is keyed by source address / source port / destination
address / destination port / protocol type.
Every connection could be "completed" in two ways :
1) we encountered packet that indicates end of connection according to
protocol
2) we did not received any packet for that connection during last 60
seconds

In the second case, function onTimer called by Flink and session are closed.
However, if a crash happens and checkpoint is restored, onTimer being
called immediately and session has been closed prematurely.
Now, I would like to prevent this from happening - so I have two solutions
- first solution is a workaround you already have seen in a previous email
e.g. first time onTimer has been triggered, it ignores call and resets
timer.
Second solution is rather hypothetical e.g. somehow forcing the timer to be
volatile or reset timer after restore , so the question is if this second
solution is feasible ?

вт, 8 февр. 2022 г. в 04:19, Yun Tang :

> Hi Alex,
>
> I think the better solution is to know what the problem you have ever met
> when restoring the timers?
>
> Flink does not support to remove state (including timer state) currently.
>
> Best
> Yun Tang
> --
> *From:* Alex Drobinsky 
> *Sent:* Monday, February 7, 2022 21:09
> *To:* Caizhi Weng 
> *Cc:* User-Flink 
> *Subject:* Re: How to prevent check pointing of timers ?
>
> By timer I mean regular timer from KeyedState which utilized via function
> onTimer, for example:
>
>
> public class StateWithTimer {
> public long timerValue = 0;
> public volatile boolean shouldResetTimer = true;
>
> public boolean resetIfMust(long timeoutInMilliseconds, TimerService 
> timerService) {
> if (shouldResetTimer) {
> setupTimer(timeoutInMilliseconds, timerService);
> shouldResetTimer = false;
> return true;
> }
> return false;
> }
>
> public void setupTimer(long timeoutInMilliseconds, TimerService 
> timerService) {
> // Cancel previous timer
> timerService.deleteProcessingTimeTimer(timerValue);
> // Register new timer
> // Should it be configurable ?
> timerValue = (timerService.currentProcessingTime() + 
> timeoutInMilliseconds)*1000/1000;
> timerService.registerProcessingTimeTimer(timerValue);
> }
>
> }
>
>
> State which utilizes timers extends StateWithTimer above, the function
> resetIfMust is current workaround - it resets timers first time after
> restart from checkpoint or start.
>
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, 
> Collector collector) throws Exception {
>MultiStorePacketState so = state.value();
>if 
> (so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout, 
> ctx.timerService())) {
>   return;
>}
>closeAndReportFile(collector, so);
>
>ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
>state.update(so);
> }
>
>
>
>
>
> пн, 7 февр. 2022 г. в 05:06, Caizhi Weng :
>
> Hi!
>
> Could you elaborate more on your code or share it if possible? Which timer
> are you talking about? Are you using the data stream API or SQL API? Do you
> mean the timer registered per record for a window aggregation? Does mini
> batch aggregation [1] solve your problem?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation
>
> Alex Drobinsky  于2022年2月3日周四 20:41写道:
>
> Dear flink user,
>
> In our project, restoring the timer's state creates numerous issues, so I
> would like to know
> if it is possible to avoid save/restore of timers altogether.
> If it isn't possible, how could I delete all registered timers during the
> open function ?
>
> Best regards,
> Alexander
>
>


Re: How to prevent check pointing of timers ?

2022-02-07 Thread Yun Tang
Hi Alex,

I think the better solution is to know what the problem you have ever met when 
restoring the timers?

Flink does not support to remove state (including timer state) currently.

Best
Yun Tang

From: Alex Drobinsky 
Sent: Monday, February 7, 2022 21:09
To: Caizhi Weng 
Cc: User-Flink 
Subject: Re: How to prevent check pointing of timers ?

By timer I mean regular timer from KeyedState which utilized via function 
onTimer, for example:



public class StateWithTimer {
public long timerValue = 0;
public volatile boolean shouldResetTimer = true;

public boolean resetIfMust(long timeoutInMilliseconds, TimerService 
timerService) {
if (shouldResetTimer) {
setupTimer(timeoutInMilliseconds, timerService);
shouldResetTimer = false;
return true;
}
return false;
}

public void setupTimer(long timeoutInMilliseconds, TimerService 
timerService) {
// Cancel previous timer
timerService.deleteProcessingTimeTimer(timerValue);
// Register new timer
// Should it be configurable ?
timerValue = (timerService.currentProcessingTime() + 
timeoutInMilliseconds)*1000/1000;
timerService.registerProcessingTimeTimer(timerValue);
}

}

State which utilizes timers extends StateWithTimer above, the function 
resetIfMust is current workaround - it resets timers first time after restart 
from checkpoint or start.


@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
   MultiStorePacketState so = state.value();
   if 
(so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout, 
ctx.timerService())) {
  return;
   }
   closeAndReportFile(collector, so);

   ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
   state.update(so);
}




пн, 7 февр. 2022 г. в 05:06, Caizhi Weng 
mailto:tsreape...@gmail.com>>:
Hi!

Could you elaborate more on your code or share it if possible? Which timer are 
you talking about? Are you using the data stream API or SQL API? Do you mean 
the timer registered per record for a window aggregation? Does mini batch 
aggregation [1] solve your problem?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation

Alex Drobinsky mailto:alex.drobin...@gmail.com>> 
于2022年2月3日周四 20:41写道:
Dear flink user,

In our project, restoring the timer's state creates numerous issues, so I would 
like to know
if it is possible to avoid save/restore of timers altogether.
If it isn't possible, how could I delete all registered timers during the open 
function ?

Best regards,
Alexander


Re: Loading a Hive lookup table data into TM memory takes so long.

2022-02-07 Thread Caizhi Weng
Hi!

If Flink is not happy with a large Hive table data

Currently it is. Hive lookup table (currently implemented just like a
filesystem lookup table) cannot look up values with a specific key, so it
has to load all data into memory.

Did you mean putting the Hive table data into a Kafka/Kinesis and joining
> the main stream

This is one solution if you'd like to use a streaming join. If you prefer
lookup joins you can try storing the data into a JDBC source and use JDBC
lookup joins instead. JDBC lookup sources can look up values with specific
keys and you can tune cache size by changing the configs here [1].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/#lookup-cache

Jason Yi <93t...@gmail.com> 于2022年2月8日周二 05:33写道:

> Hi Caizhi,
>
> Could you tell me more details about streaming joins that you suggested?
> Did you mean putting the Hive table data into a Kafka/Kinesis and joining
> the main stream with the hive table data streaming with a very long
> watermark?
>
> In my use case, the hive table is an account dimension table and I wanted
> to join an event stream with the account dimension in Flink. I thought a
> lookup table source would work for my use case, but I got a performance
> problem as mentioned above.
>
> If there's a good solution, I'm open to it. I just need to confirm if
> Flink is not happy with a large Hive table data.
>
> Jason.
>
>
> On Sun, Feb 6, 2022 at 7:01 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> Each parallelism of the lookup operation will load all data from the
>> lookup table source, so you're loading 10GB of data to each parallelism and
>> storing them in JVM memory. That is not only slow but also very
>> memory-consuming.
>>
>> Have you tried joining your main stream with the hive table directly
>> (that is, using streaming joins instead of lookup joins)? Does that meet
>> your need or why do you have to use lookup joins?
>>
>> Jason Yi <93t...@gmail.com> 于2022年2月5日周六 08:01写道:
>>
>>> Hello,
>>>
>>> I created external tables on Hive with data in s3 and wanted to use
>>> those tables as a lookup table in Flink.
>>>
>>> When I used an external table containing a small size of data as a
>>> lookup table, Flink quickly loaded the data into TM memory and did a
>>> Temporal join to an event stream. But, when I put an external table
>>> containing ~10GB of data, Flink took so long to load the data and finally
>>> returned a timeout error. (I set the heartbeat.timeout to 20)
>>>
>>> Is there a way to make Flink read Hive data faster? Or is this normal?
>>> MySQL lookup tables would be recommended when we have a large size of
>>> dimension data?
>>>
>>> Here's the test environment:
>>>  - 1.14.0 Flink
>>>  - EMR 6.5
>>>  - Hive 3.1.2 installed on EMR
>>>  - Hive with a default MetaStore on EMR used. (Not MySQL or Glue
>>> Metastore)
>>>  - Parquet source data in s3 for the external table on Hive
>>>
>>> Below is part of the Flink log produced while loading the Hive table
>>> data. Flink seemed to open one parquet file multiple times and moved to
>>> another parquet file to open. I wonder if this is normal. Why Flink didn't
>>> read data from multiple files in parallel. I'm not sure if this is a
>>> problem caused by the default Hive Metastore.
>>>
>>> ..
>>> 2022-02-04 22:42:54,839 INFO
>>>  org.apache.flink.table.filesystem.FileSystemLookupFunction   [] -
>>> Populating lookup join cache
>>> 2022-02-04 22:42:54,839 INFO
>>>  org.apache.flink.table.filesystem.FileSystemLookupFunction   [] -
>>> Populating lookup join cache
>>> 2022-02-04 22:42:55,083 INFO  org.apache.hadoop.mapred.FileInputFormat
>>>   [] - Total input files to process : 12
>>> 2022-02-04 22:42:55,084 INFO
>>>  org.apache.flink.connectors.hive.read.HiveTableInputFormat   [] - Use
>>> flink parquet ColumnarRowData reader.
>>> 2022-02-04 22:42:55,096 INFO  org.apache.hadoop.mapred.FileInputFormat
>>>   [] - Total input files to process : 12
>>> 2022-02-04 22:42:55,097 INFO
>>>  org.apache.flink.connectors.hive.read.HiveTableInputFormat   [] - Use
>>> flink parquet ColumnarRowData reader.
>>> 2022-02-04 22:42:55,105 INFO
>>>  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem   [] - Opening
>>> 's3://bucket/path/to/files/part-0-55f0ff62-bf83-4eac-8ce8-308bd9efda24-c000.snappy.parquet'
>>> for reading
>>> 2022-02-04 22:42:55,116 INFO
>>>  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem   [] - Opening
>>> 's3://bucket/path/to/files/part-0-55f0ff62-bf83-4eac-8ce8-308bd9efda24-c000.snappy.parquet'
>>> for reading
>>> 2022-02-04 22:42:55,169 INFO
>>>  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem   [] - Opening
>>> 's3://bucket/path/to/files/part-0-55f0ff62-bf83-4eac-8ce8-308bd9efda24-c000.snappy.parquet'
>>> for reading
>>> 2022-02-04 22:42:55,172 INFO
>>>  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem   [] - Opening
>>> 

Re: Questions about Kryo setRegistrationRequired(false)

2022-02-07 Thread Shane Bishop
Thanks Chesnay Schepler.

I filed a ticket: https://issues.apache.org/jira/browse/FLINK-25993

My team will try disabling Kyro with ExecutionConfig#disableGenericTypes and 
see if we need to change our data types or not.

Best regards,
Shane


From: Chesnay Schepler 
Sent: February 7, 2022 3:08 AM
To: Shane Bishop ; user@flink.apache.org 

Subject: Re: Questions about Kryo setRegistrationRequired(false)

There isn't any setting to control setRegistrationRequired().

You can however turn Kryo off via ExecutionConfig#disableGenericTypes, although 
this may require changes to your data types.

I'd recommend to file a ticket.

On 04/02/2022 20:12, Shane Bishop wrote:
Hi all,

TL;DR: I am concerned that kryo.setRegistrationRequired(false) in Apache Flink 
might introduce serialization/deserialization vulnerabilities, and I want to 
better understand the security implications of its use in Flink.

There is an issue on the Kryo GitHub repo 
(link) regarding type 
registration. The "fix" the Kryo developers made was to make 
setRegistrationRequired(true) the default (comment on GitHub 
issue,
 commit with this 
fix
 and the line in the commit that is the 
fix).

This is not a true fix, as the default can still be overridden. This only sets 
a safe default.

In Flink, the default of true is overridden in the 1.14.3 Flink release (see 
KryoSerializer.java
 and 
FlinkScalaKryoInstantiator.scala).

I am no Flink contributor, so I might be missing safety mechanisms that are in 
place to prevent the Kryo serialization/deserialization vulnerability even when 
registration required is set to false. Are there any such safety mechanisms in 
place?

Is there anything I can do as a user of Flink to protect myself against this 
Kryo vulnerability?

Best regards,
Shane Bishop



Re: pyflink datastream job

2022-02-07 Thread nagi data monkey
ah, thanks, those doc pages are what I missed!

On Mon, Feb 7, 2022 at 2:48 AM Dian Fu  wrote:

> The following code snippet should just work:
>
> ```
> from pyflink.datastream import StreamExecutionEnvironment
> env = StreamExecutionEnvironment.get_execution_environment()
> ```
>
> It works both in local deployment and in flink clusters.
>
> You could refer to [1] on how to submit PyFlink jobs to a remote cluster.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#submitting-pyflink-jobs
>
> Regards,
> Dian
>
>
> On Mon, Feb 7, 2022 at 12:09 AM nagi data monkey 
> wrote:
>
>> Hi all,
>>
>> Anyone got a pyflink datastream job working? I think I'm having
>> difficulty seeing a small flunk cluster I've set up in docker. I can't see
>> any way that pyflink can pick up a Remote Execution Enviroment. This is the
>> only 'compiling' code snippet I can find:
>>
>> from pyflink.datastream import StreamExecutionEnvironment
>> env = StreamExecutionEnvironment.get_execution_environment()
>>
>> which at least allows me to run pyflink code, but not see any flink
>> clusters etc. Any ideas how I'm meant to actually get a pyflink job running?
>>
>> TIA
>>
>


Re: Loading a Hive lookup table data into TM memory takes so long.

2022-02-07 Thread Jason Yi
Hi Caizhi,

Could you tell me more details about streaming joins that you suggested?
Did you mean putting the Hive table data into a Kafka/Kinesis and joining
the main stream with the hive table data streaming with a very long
watermark?

In my use case, the hive table is an account dimension table and I wanted
to join an event stream with the account dimension in Flink. I thought a
lookup table source would work for my use case, but I got a performance
problem as mentioned above.

If there's a good solution, I'm open to it. I just need to confirm if Flink
is not happy with a large Hive table data.

Jason.


On Sun, Feb 6, 2022 at 7:01 PM Caizhi Weng  wrote:

> Hi!
>
> Each parallelism of the lookup operation will load all data from the
> lookup table source, so you're loading 10GB of data to each parallelism and
> storing them in JVM memory. That is not only slow but also very
> memory-consuming.
>
> Have you tried joining your main stream with the hive table directly (that
> is, using streaming joins instead of lookup joins)? Does that meet your
> need or why do you have to use lookup joins?
>
> Jason Yi <93t...@gmail.com> 于2022年2月5日周六 08:01写道:
>
>> Hello,
>>
>> I created external tables on Hive with data in s3 and wanted to use those
>> tables as a lookup table in Flink.
>>
>> When I used an external table containing a small size of data as a lookup
>> table, Flink quickly loaded the data into TM memory and did a Temporal join
>> to an event stream. But, when I put an external table containing ~10GB of
>> data, Flink took so long to load the data and finally returned a timeout
>> error. (I set the heartbeat.timeout to 20)
>>
>> Is there a way to make Flink read Hive data faster? Or is this normal?
>> MySQL lookup tables would be recommended when we have a large size of
>> dimension data?
>>
>> Here's the test environment:
>>  - 1.14.0 Flink
>>  - EMR 6.5
>>  - Hive 3.1.2 installed on EMR
>>  - Hive with a default MetaStore on EMR used. (Not MySQL or Glue
>> Metastore)
>>  - Parquet source data in s3 for the external table on Hive
>>
>> Below is part of the Flink log produced while loading the Hive table
>> data. Flink seemed to open one parquet file multiple times and moved to
>> another parquet file to open. I wonder if this is normal. Why Flink didn't
>> read data from multiple files in parallel. I'm not sure if this is a
>> problem caused by the default Hive Metastore.
>>
>> ..
>> 2022-02-04 22:42:54,839 INFO
>>  org.apache.flink.table.filesystem.FileSystemLookupFunction   [] -
>> Populating lookup join cache
>> 2022-02-04 22:42:54,839 INFO
>>  org.apache.flink.table.filesystem.FileSystemLookupFunction   [] -
>> Populating lookup join cache
>> 2022-02-04 22:42:55,083 INFO  org.apache.hadoop.mapred.FileInputFormat
>>   [] - Total input files to process : 12
>> 2022-02-04 22:42:55,084 INFO
>>  org.apache.flink.connectors.hive.read.HiveTableInputFormat   [] - Use
>> flink parquet ColumnarRowData reader.
>> 2022-02-04 22:42:55,096 INFO  org.apache.hadoop.mapred.FileInputFormat
>>   [] - Total input files to process : 12
>> 2022-02-04 22:42:55,097 INFO
>>  org.apache.flink.connectors.hive.read.HiveTableInputFormat   [] - Use
>> flink parquet ColumnarRowData reader.
>> 2022-02-04 22:42:55,105 INFO
>>  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem   [] - Opening
>> 's3://bucket/path/to/files/part-0-55f0ff62-bf83-4eac-8ce8-308bd9efda24-c000.snappy.parquet'
>> for reading
>> 2022-02-04 22:42:55,116 INFO
>>  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem   [] - Opening
>> 's3://bucket/path/to/files/part-0-55f0ff62-bf83-4eac-8ce8-308bd9efda24-c000.snappy.parquet'
>> for reading
>> 2022-02-04 22:42:55,169 INFO
>>  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem   [] - Opening
>> 's3://bucket/path/to/files/part-0-55f0ff62-bf83-4eac-8ce8-308bd9efda24-c000.snappy.parquet'
>> for reading
>> 2022-02-04 22:42:55,172 INFO
>>  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem   [] - Opening
>> 's3://bucket/path/to/files/part-0-55f0ff62-bf83-4eac-8ce8-308bd9efda24-c000.snappy.parquet'
>> for reading
>> 2022-02-04 22:42:57,782 INFO
>>  org.apache.flink.connectors.hive.read.HiveTableInputFormat   [] - Use
>> flink parquet ColumnarRowData reader.
>> 2022-02-04 22:42:57,783 INFO
>>  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem   [] - Opening
>> 's3://bucket/path/to/files/part-0-55f0ff62-bf83-4eac-8ce8-308bd9efda24-c000.snappy.parquet'
>> for reading
>> 2022-02-04 22:42:57,799 INFO
>>  org.apache.flink.connectors.hive.read.HiveTableInputFormat   [] - Use
>> flink parquet ColumnarRowData reader.
>> 2022-02-04 22:42:57,801 INFO
>>  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem   [] - Opening
>> 's3://bucket/path/to/files/part-0-55f0ff62-bf83-4eac-8ce8-308bd9efda24-c000.snappy.parquet'
>> for reading
>> 2022-02-04 22:42:57,851 INFO
>>  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem  

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
Ok I think Ali's solution makes the most sense to me. I'll try it and let
you know.

On Mon, Feb 7, 2022 at 11:44 AM Jing Ge  wrote:

> Hi John,
>
> your getKey() implementation shows that it is not deterministic, since
> calling it with the same click instance multiple times will return
> different keys. For example a call at 12:01:59.950 and a call at
> 12:02:00.050 with the same click instance will return two different keys:
>
> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>
> best regards
> Jing
>
> On Mon, Feb 7, 2022 at 5:07 PM John Smith  wrote:
>
>> Maybe there's a misunderstanding. But basically I want to do clickstream
>> count for a given "url" and for simplicity and accuracy of the count base
>> it on processing time (event time doesn't matter as long as I get a total
>> of clicks at that given processing time)
>>
>> So regardless of the event time. I want all clicks for the current
>> processing time rounded to the minute per link.
>>
>> So, if now was 2022-04-07T12:01:00.000Z
>>
>> Then I would want the following result...
>>
>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
>> 
>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
>> And so on...
>>
>> @Override
>> public MyEventCountKey getKey(final MyEvent click) throws Exception
>> {
>> MyEventCountKey key = new MyEventCountKey(
>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
>> click.getDomain(), // cnn.com
>> click.getPath(), // /some-article-name
>> );
>> return key;
>> }
>>
>>
>>
>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:
>>
>>> The key selector works.
>>>
>>>
>>> No it does not ;) It depends on the system time so it's not
>>> deterministic (you can get different keys for the very same element).
>>>
>>> How do you key a count based on the time. I have taken this from samples
 online.

>>>
>>> This is what the windowing is for. You basically want to group / combine
>>> elements per key and event time window [1].
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>>
>>> Best,
>>> D.
>>>
>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith 
>>> wrote:
>>>
 The key selector works. It only causes an issue if there too many keys
 produced in one shot. For example of 100 "same" keys are produced for that
 1 minutes it's ok. But if 101 are produced the error happens.


 If you look at the reproducer at least that's what's hapenning

 How do you key a count based on the time. I have taken this from
 samples online.

 The key is that particular time for that particular URL path.

 So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00

 On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
 wrote:

> Your Key selector doesn't need to implement hashCode, but given the
> same object it has to return the same key.
> In your reproducer the returned key will have different timestamps,
> and since the timestamp is included in the hashCode, they will be 
> different
> each time.
>
> On 07/02/2022 14:50, John Smith wrote:
>
> I don't get it? I provided the reproducer. I implemented the interface
> to Key selector it needs hashcode and equals as well?
>
> I'm attempting to do click stream. So the key is based on processing
> date/time rounded to the minute + domain name + path
>
> So these should be valid below?
>
> 2022-01-01T10:02:00 + cnn.com + /article1
> 2022-01-01T10:02:00 + cnn.com + /article1
> 2022-01-01T10:02:00 + cnn.com + /article1
>
> 2022-01-01T10:02:00 + cnn.com + /article2
>
> 2022-01-01T10:03:00 + cnn.com + /article1
> 2022-01-01T10:03:00 + cnn.com + /article1
>
> 2022-01-01T10:03:00 + cnn.com + /article3
> 2022-01-01T10:03:00 + cnn.com + /article3
>
> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
> wrote:
>
>> Don't KeySelectors also need to be deterministic?
>>
>> * The {@link KeySelector} allows to use deterministic objects for 
>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
>> multiple times on the same object, the returned key*** must be the same.*
>>
>>
>> On 04/02/2022 18:25, John Smith wrote:
>>
>> Hi Francesco,  here is the reproducer:
>> https://github.com/javadevmtl/flink-key-reproducer
>>
>> So, essentially it looks like when there's a high influx of records
>> produced from the source that the Exception is thrown.
>>
>> The key is 

Re: Reading from Kafka kafkarecorddeserializationschema

2022-02-07 Thread HG
Hi Dawid

I am a little bit worried by the code because of the ByteBuffer and the
endianness?
Do I really need to worry about them and determine them too?

Or was it just convenient to use ByteBuffer and the endianness here?

Regards Hans

public Event deserialize(byte[] message) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN);
int address = buffer.getInt(0);
int typeOrdinal = buffer.getInt(4);
return new Event(EventType.values()[typeOrdinal], address);
}





Op vr 4 feb. 2022 om 14:42 schreef Dawid Wysakowicz :

> Hi,
>
> You can use DeserializationSchema with KafkaSource as well. You can pass
> it to the KafkaSource.builder():
>
> KafkaSource.<...>builder()
>
> .setDeserializer(...)
>
> You can also take a look at the StateMachineExample[1], where KafkaSource
> is used.
>
> BTW, have you tried looking at Table API? It would abstract quite a few
> things for you, e.g. translation of what I presume is a CSV format[2] in
> your case.
>
> Best,
>
> Dawid
>
> [1]
> https://github.com/apache/flink/blob/5846d8d61b4b2aa10f925e9f63885cb7f6686303/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java#L104
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/
> On 03/02/2022 16:56, HG wrote:
>
> Hello
>
> Most examples available still use the FlinkKafkaConsumer unfortunately.
> I need to consume events from Kafka.
> The format is Long,Timestamp,String,String.
>
> Do I need to create a custom deserializer?
>
> What also confuses me is
>
> KafkaSource** source = KafkaSource
>
> How does it relate to the deserializer?
> Is there a kind of  type or is  fine even if the message is a
> composite of Long,String...?
>
> Regards Hans
>
>


Re: How to proper hashCode() for keys.

2022-02-07 Thread Jing Ge
Hi John,

your getKey() implementation shows that it is not deterministic, since
calling it with the same click instance multiple times will return
different keys. For example a call at 12:01:59.950 and a call at
12:02:00.050 with the same click instance will return two different keys:

2022-04-07T12:01:00.000Z|cnn.com|some-article-name
2022-04-07T12:02:00.000Z|cnn.com|some-article-name

best regards
Jing

On Mon, Feb 7, 2022 at 5:07 PM John Smith  wrote:

> Maybe there's a misunderstanding. But basically I want to do clickstream
> count for a given "url" and for simplicity and accuracy of the count base
> it on processing time (event time doesn't matter as long as I get a total
> of clicks at that given processing time)
>
> So regardless of the event time. I want all clicks for the current
> processing time rounded to the minute per link.
>
> So, if now was 2022-04-07T12:01:00.000Z
>
> Then I would want the following result...
>
> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
> 
> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
> And so on...
>
> @Override
> public MyEventCountKey getKey(final MyEvent click) throws Exception
> {
> MyEventCountKey key = new MyEventCountKey(
> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
> click.getDomain(), // cnn.com
> click.getPath(), // /some-article-name
> );
> return key;
> }
>
>
>
> On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:
>
>> The key selector works.
>>
>>
>> No it does not ;) It depends on the system time so it's not deterministic
>> (you can get different keys for the very same element).
>>
>> How do you key a count based on the time. I have taken this from samples
>>> online.
>>>
>>
>> This is what the windowing is for. You basically want to group / combine
>> elements per key and event time window [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>
>> Best,
>> D.
>>
>> On Mon, Feb 7, 2022 at 3:44 PM John Smith  wrote:
>>
>>> The key selector works. It only causes an issue if there too many keys
>>> produced in one shot. For example of 100 "same" keys are produced for that
>>> 1 minutes it's ok. But if 101 are produced the error happens.
>>>
>>>
>>> If you look at the reproducer at least that's what's hapenning
>>>
>>> How do you key a count based on the time. I have taken this from samples
>>> online.
>>>
>>> The key is that particular time for that particular URL path.
>>>
>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>>
>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
>>> wrote:
>>>
 Your Key selector doesn't need to implement hashCode, but given the
 same object it has to return the same key.
 In your reproducer the returned key will have different timestamps, and
 since the timestamp is included in the hashCode, they will be different
 each time.

 On 07/02/2022 14:50, John Smith wrote:

 I don't get it? I provided the reproducer. I implemented the interface
 to Key selector it needs hashcode and equals as well?

 I'm attempting to do click stream. So the key is based on processing
 date/time rounded to the minute + domain name + path

 So these should be valid below?

 2022-01-01T10:02:00 + cnn.com + /article1
 2022-01-01T10:02:00 + cnn.com + /article1
 2022-01-01T10:02:00 + cnn.com + /article1

 2022-01-01T10:02:00 + cnn.com + /article2

 2022-01-01T10:03:00 + cnn.com + /article1
 2022-01-01T10:03:00 + cnn.com + /article1

 2022-01-01T10:03:00 + cnn.com + /article3
 2022-01-01T10:03:00 + cnn.com + /article3

 On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
 wrote:

> Don't KeySelectors also need to be deterministic?
>
> * The {@link KeySelector} allows to use deterministic objects for 
> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
> multiple times on the same object, the returned key*** must be the same.*
>
>
> On 04/02/2022 18:25, John Smith wrote:
>
> Hi Francesco,  here is the reproducer:
> https://github.com/javadevmtl/flink-key-reproducer
>
> So, essentially it looks like when there's a high influx of records
> produced from the source that the Exception is thrown.
>
> The key is generated by 3 values: date/time rounded to the minute and
> 2 strings.
> So you will see keys as follows...
> 2022-02-04T17:20:00Z|foo|bar
> 2022-02-04T17:21:00Z|foo|bar
> 2022-02-04T17:22:00Z|foo|bar
>
> The reproducer has a custom source that basically produces 

Re: How to proper hashCode() for keys.

2022-02-07 Thread Ali Bahadir Zeybek
Hello John,

During the lifecycle of the execution for a given event, the key information
is not passed in between different operators, but they are computed based on
the given key selector, every time an (keyed)operator sees the event.
Therefore, the same event, within the same pipeline, could be assigned a
different key while moving along the graph of operators. This part in your
key
selector is not deterministic since it depends on the time the key selector
function is executed. My suggestion would be to materialise the key as an
additional field to your event at the beginning of the pipeline and then use
that field as the key.

Sincerely,

Ali

On Mon, Feb 7, 2022 at 7:06 PM John Smith  wrote:

> Maybe there's a misunderstanding. But basically I want to do clickstream
> count for a given "url" and for simplicity and accuracy of the count base
> it on processing time (event time doesn't matter as long as I get a total
> of clicks at that given processing time)
>
> So regardless of the event time. I want all clicks for the current
> processing time rounded to the minute per link.
>
> So, if now was 2022-04-07T12:01:00.000Z
>
> Then I would want the following result...
>
> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
> 
> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
> And so on...
>
> @Override
> public MyEventCountKey getKey(final MyEvent click) throws Exception
> {
> MyEventCountKey key = new MyEventCountKey(
> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
> click.getDomain(), // cnn.com
> click.getPath(), // /some-article-name
> );
> return key;
> }
>
>
>
> On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:
>
>> The key selector works.
>>
>>
>> No it does not ;) It depends on the system time so it's not deterministic
>> (you can get different keys for the very same element).
>>
>> How do you key a count based on the time. I have taken this from samples
>>> online.
>>>
>>
>> This is what the windowing is for. You basically want to group / combine
>> elements per key and event time window [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>
>> Best,
>> D.
>>
>> On Mon, Feb 7, 2022 at 3:44 PM John Smith  wrote:
>>
>>> The key selector works. It only causes an issue if there too many keys
>>> produced in one shot. For example of 100 "same" keys are produced for that
>>> 1 minutes it's ok. But if 101 are produced the error happens.
>>>
>>>
>>> If you look at the reproducer at least that's what's hapenning
>>>
>>> How do you key a count based on the time. I have taken this from samples
>>> online.
>>>
>>> The key is that particular time for that particular URL path.
>>>
>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>>
>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
>>> wrote:
>>>
 Your Key selector doesn't need to implement hashCode, but given the
 same object it has to return the same key.
 In your reproducer the returned key will have different timestamps, and
 since the timestamp is included in the hashCode, they will be different
 each time.

 On 07/02/2022 14:50, John Smith wrote:

 I don't get it? I provided the reproducer. I implemented the interface
 to Key selector it needs hashcode and equals as well?

 I'm attempting to do click stream. So the key is based on processing
 date/time rounded to the minute + domain name + path

 So these should be valid below?

 2022-01-01T10:02:00 + cnn.com + /article1
 2022-01-01T10:02:00 + cnn.com + /article1
 2022-01-01T10:02:00 + cnn.com + /article1

 2022-01-01T10:02:00 + cnn.com + /article2

 2022-01-01T10:03:00 + cnn.com + /article1
 2022-01-01T10:03:00 + cnn.com + /article1

 2022-01-01T10:03:00 + cnn.com + /article3
 2022-01-01T10:03:00 + cnn.com + /article3

 On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
 wrote:

> Don't KeySelectors also need to be deterministic?
>
> * The {@link KeySelector} allows to use deterministic objects for 
> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
> multiple times on the same object, the returned key*** must be the same.*
>
>
> On 04/02/2022 18:25, John Smith wrote:
>
> Hi Francesco,  here is the reproducer:
> https://github.com/javadevmtl/flink-key-reproducer
>
> So, essentially it looks like when there's a high influx of records
> produced from the source that the Exception is thrown.
>
> The key is generated by 3 values: date/time rounded to 

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
Maybe there's a misunderstanding. But basically I want to do clickstream
count for a given "url" and for simplicity and accuracy of the count base
it on processing time (event time doesn't matter as long as I get a total
of clicks at that given processing time)

So regardless of the event time. I want all clicks for the current
processing time rounded to the minute per link.

So, if now was 2022-04-07T12:01:00.000Z

Then I would want the following result...

2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15

2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
And so on...

@Override
public MyEventCountKey getKey(final MyEvent click) throws Exception
{
MyEventCountKey key = new MyEventCountKey(
Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")), ChronoField.
MINUTE_OF_HOUR, windowSizeMins)).toString(),
click.getDomain(), // cnn.com
click.getPath(), // /some-article-name
);
return key;
}



On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:

> The key selector works.
>
>
> No it does not ;) It depends on the system time so it's not deterministic
> (you can get different keys for the very same element).
>
> How do you key a count based on the time. I have taken this from samples
>> online.
>>
>
> This is what the windowing is for. You basically want to group / combine
> elements per key and event time window [1].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>
> Best,
> D.
>
> On Mon, Feb 7, 2022 at 3:44 PM John Smith  wrote:
>
>> The key selector works. It only causes an issue if there too many keys
>> produced in one shot. For example of 100 "same" keys are produced for that
>> 1 minutes it's ok. But if 101 are produced the error happens.
>>
>>
>> If you look at the reproducer at least that's what's hapenning
>>
>> How do you key a count based on the time. I have taken this from samples
>> online.
>>
>> The key is that particular time for that particular URL path.
>>
>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>
>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
>> wrote:
>>
>>> Your Key selector doesn't need to implement hashCode, but given the same
>>> object it has to return the same key.
>>> In your reproducer the returned key will have different timestamps, and
>>> since the timestamp is included in the hashCode, they will be different
>>> each time.
>>>
>>> On 07/02/2022 14:50, John Smith wrote:
>>>
>>> I don't get it? I provided the reproducer. I implemented the interface
>>> to Key selector it needs hashcode and equals as well?
>>>
>>> I'm attempting to do click stream. So the key is based on processing
>>> date/time rounded to the minute + domain name + path
>>>
>>> So these should be valid below?
>>>
>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>
>>> 2022-01-01T10:02:00 + cnn.com + /article2
>>>
>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>
>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>
>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
>>> wrote:
>>>
 Don't KeySelectors also need to be deterministic?

 * The {@link KeySelector} allows to use deterministic objects for 
 operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
 multiple times on the same object, the returned key*** must be the same.*


 On 04/02/2022 18:25, John Smith wrote:

 Hi Francesco,  here is the reproducer:
 https://github.com/javadevmtl/flink-key-reproducer

 So, essentially it looks like when there's a high influx of records
 produced from the source that the Exception is thrown.

 The key is generated by 3 values: date/time rounded to the minute and 2
 strings.
 So you will see keys as follows...
 2022-02-04T17:20:00Z|foo|bar
 2022-02-04T17:21:00Z|foo|bar
 2022-02-04T17:22:00Z|foo|bar

 The reproducer has a custom source that basically produces a record in
 a loop and sleeps for a specified period of milliseconds 100ms in this 
 case.
 The lower the sleep delay the faster records are produced the more
 chances the exception is thrown. With a 100ms delay it's always thrown.
 Setting a 2000 to 3000ms will guarantee it to work.
 The original job uses a Kafka Source so it should technically be able
 to handle even a couple thousand records per second.


 On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:

> Ok it's not my data either. I think it may be a volume issue. I have
> managed to consistently 

Re: How to proper hashCode() for keys.

2022-02-07 Thread David Morávek
>
> The key selector works.


No it does not ;) It depends on the system time so it's not deterministic
(you can get different keys for the very same element).

How do you key a count based on the time. I have taken this from samples
> online.
>

This is what the windowing is for. You basically want to group / combine
elements per key and event time window [1].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/

Best,
D.

On Mon, Feb 7, 2022 at 3:44 PM John Smith  wrote:

> The key selector works. It only causes an issue if there too many keys
> produced in one shot. For example of 100 "same" keys are produced for that
> 1 minutes it's ok. But if 101 are produced the error happens.
>
>
> If you look at the reproducer at least that's what's hapenning
>
> How do you key a count based on the time. I have taken this from samples
> online.
>
> The key is that particular time for that particular URL path.
>
> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>
> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
> wrote:
>
>> Your Key selector doesn't need to implement hashCode, but given the same
>> object it has to return the same key.
>> In your reproducer the returned key will have different timestamps, and
>> since the timestamp is included in the hashCode, they will be different
>> each time.
>>
>> On 07/02/2022 14:50, John Smith wrote:
>>
>> I don't get it? I provided the reproducer. I implemented the interface to
>> Key selector it needs hashcode and equals as well?
>>
>> I'm attempting to do click stream. So the key is based on processing
>> date/time rounded to the minute + domain name + path
>>
>> So these should be valid below?
>>
>> 2022-01-01T10:02:00 + cnn.com + /article1
>> 2022-01-01T10:02:00 + cnn.com + /article1
>> 2022-01-01T10:02:00 + cnn.com + /article1
>>
>> 2022-01-01T10:02:00 + cnn.com + /article2
>>
>> 2022-01-01T10:03:00 + cnn.com + /article1
>> 2022-01-01T10:03:00 + cnn.com + /article1
>>
>> 2022-01-01T10:03:00 + cnn.com + /article3
>> 2022-01-01T10:03:00 + cnn.com + /article3
>>
>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
>> wrote:
>>
>>> Don't KeySelectors also need to be deterministic?
>>>
>>> * The {@link KeySelector} allows to use deterministic objects for 
>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
>>> multiple times on the same object, the returned key*** must be the same.*
>>>
>>>
>>> On 04/02/2022 18:25, John Smith wrote:
>>>
>>> Hi Francesco,  here is the reproducer:
>>> https://github.com/javadevmtl/flink-key-reproducer
>>>
>>> So, essentially it looks like when there's a high influx of records
>>> produced from the source that the Exception is thrown.
>>>
>>> The key is generated by 3 values: date/time rounded to the minute and 2
>>> strings.
>>> So you will see keys as follows...
>>> 2022-02-04T17:20:00Z|foo|bar
>>> 2022-02-04T17:21:00Z|foo|bar
>>> 2022-02-04T17:22:00Z|foo|bar
>>>
>>> The reproducer has a custom source that basically produces a record in a
>>> loop and sleeps for a specified period of milliseconds 100ms in this case.
>>> The lower the sleep delay the faster records are produced the more
>>> chances the exception is thrown. With a 100ms delay it's always thrown.
>>> Setting a 2000 to 3000ms will guarantee it to work.
>>> The original job uses a Kafka Source so it should technically be able to
>>> handle even a couple thousand records per second.
>>>
>>>
>>> On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:
>>>
 Ok it's not my data either. I think it may be a volume issue. I have
 managed to consistently reproduce the error. I'll upload a reproducer ASAP.



 On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:

> Ok so I tried to create a reproducer but I couldn't reproduce it. But
> the actual job once in a while throws that error. So I'm wondering if 
> maybe
> one of the records that comes in is not valid, though I do validate prior
> to getting to the key and window operators.
>
> On Thu, 3 Feb 2022 at 14:32, John Smith 
> wrote:
>
>> Actually maybe not because with PrintSinkFunction it ran for a bit
>> and then it threw the error.
>>
>> On Thu, 3 Feb 2022 at 14:24, John Smith 
>> wrote:
>>
>>> Ok it may be the ElasticSearch connector causing the issue?
>>>
>>> If I use PrintSinkFunction then I get no error and my stats print as
>>> expected.
>>>
>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>> france...@ververica.com> wrote:
>>>
 Hi,
 your hash code and equals seems correct. Can you post a minimum
 stream pipeline reproducer using this class?

 FG

 On Tue, Feb 1, 2022 at 8:39 PM John Smith 
 wrote:

> Hi, getting java.lang.IllegalArgumentException: Key group 39 is
> not in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless 

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
The key selector works. It only causes an issue if there too many keys
produced in one shot. For example of 100 "same" keys are produced for that
1 minutes it's ok. But if 101 are produced the error happens.


If you look at the reproducer at least that's what's hapenning

How do you key a count based on the time. I have taken this from samples
online.

The key is that particular time for that particular URL path.

So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00

On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
wrote:

> Your Key selector doesn't need to implement hashCode, but given the same
> object it has to return the same key.
> In your reproducer the returned key will have different timestamps, and
> since the timestamp is included in the hashCode, they will be different
> each time.
>
> On 07/02/2022 14:50, John Smith wrote:
>
> I don't get it? I provided the reproducer. I implemented the interface to
> Key selector it needs hashcode and equals as well?
>
> I'm attempting to do click stream. So the key is based on processing
> date/time rounded to the minute + domain name + path
>
> So these should be valid below?
>
> 2022-01-01T10:02:00 + cnn.com + /article1
> 2022-01-01T10:02:00 + cnn.com + /article1
> 2022-01-01T10:02:00 + cnn.com + /article1
>
> 2022-01-01T10:02:00 + cnn.com + /article2
>
> 2022-01-01T10:03:00 + cnn.com + /article1
> 2022-01-01T10:03:00 + cnn.com + /article1
>
> 2022-01-01T10:03:00 + cnn.com + /article3
> 2022-01-01T10:03:00 + cnn.com + /article3
>
> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
> wrote:
>
>> Don't KeySelectors also need to be deterministic?
>>
>> * The {@link KeySelector} allows to use deterministic objects for operations 
>> such as reduce,* reduceGroup, join, coGroup, etc. *If invoked multiple times 
>> on the same object, the returned key*** must be the same.*
>>
>>
>> On 04/02/2022 18:25, John Smith wrote:
>>
>> Hi Francesco,  here is the reproducer:
>> https://github.com/javadevmtl/flink-key-reproducer
>>
>> So, essentially it looks like when there's a high influx of records
>> produced from the source that the Exception is thrown.
>>
>> The key is generated by 3 values: date/time rounded to the minute and 2
>> strings.
>> So you will see keys as follows...
>> 2022-02-04T17:20:00Z|foo|bar
>> 2022-02-04T17:21:00Z|foo|bar
>> 2022-02-04T17:22:00Z|foo|bar
>>
>> The reproducer has a custom source that basically produces a record in a
>> loop and sleeps for a specified period of milliseconds 100ms in this case.
>> The lower the sleep delay the faster records are produced the more
>> chances the exception is thrown. With a 100ms delay it's always thrown.
>> Setting a 2000 to 3000ms will guarantee it to work.
>> The original job uses a Kafka Source so it should technically be able to
>> handle even a couple thousand records per second.
>>
>>
>> On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:
>>
>>> Ok it's not my data either. I think it may be a volume issue. I have
>>> managed to consistently reproduce the error. I'll upload a reproducer ASAP.
>>>
>>>
>>>
>>> On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:
>>>
 Ok so I tried to create a reproducer but I couldn't reproduce it. But
 the actual job once in a while throws that error. So I'm wondering if maybe
 one of the records that comes in is not valid, though I do validate prior
 to getting to the key and window operators.

 On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:

> Actually maybe not because with PrintSinkFunction it ran for a bit and
> then it threw the error.
>
> On Thu, 3 Feb 2022 at 14:24, John Smith 
> wrote:
>
>> Ok it may be the ElasticSearch connector causing the issue?
>>
>> If I use PrintSinkFunction then I get no error and my stats print as
>> expected.
>>
>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>> france...@ververica.com> wrote:
>>
>>> Hi,
>>> your hash code and equals seems correct. Can you post a minimum
>>> stream pipeline reproducer using this class?
>>>
>>> FG
>>>
>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith 
>>> wrote:
>>>
 Hi, getting java.lang.IllegalArgumentException: Key group 39 is not
 in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're 
 directly
 using low level state access APIs, this is most likely caused by
 non-deterministic shuffle key (hashCode and equals implementation).

 This is my class, is my hashCode deterministic?

 public final class MyEventCountKey {
 private final String countDateTime;private final String 
 domain;private final String event;public MyEventCountKey(final 
 String countDateTime, final String domain, final String event) {
 this.countDateTime = countDateTime;this.domain = 
 domain;this.event = event;}

 

Re: Example with JSONKeyValueDeserializationSchema?

2022-02-07 Thread HG
Hello Kamil et all,

When I build this code:

KafkaSource source = KafkaSource.builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type",trustStoreType)
.setProperty("ssl.truststore.password",trustStorePassword)
.setProperty("ssl.truststore.location",trustStoreLocation)
.setProperty("security.protocol",securityProtocol)
.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
.setDeserializer(new JSONKeyValueDeserializationSchema(false))

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();


I get:
This error:

error: incompatible types:
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
cannot be converted to
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
.setDeserializer(new
JSONKeyValueDeserializationSchema(false))


What am I doing wrong?
As per the documentation JSONKeyValueDeserializationSchema returns an
ObjectNode.

Regards Hans-Peter



Op vr 14 jan. 2022 om 20:32 schreef Kamil ty :

> Hello Hans,
>
> As far as I know the JSONKeyValueDeserializationSchema returns a Jackson
> ObjectNode. Below I have included an example based on Flink stable
> documentation.
>
> KafkaSource source = KafkaSource.builder()
> .setBootstrapServers(brokers)
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setDeserializer(new JSONKeyValueDeserializationSchema(false))
> .build();
>
> DataStream ds = env.fromSource(source,
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> // Below we access the JSON field stored in the ObjectNode.
> DataStream processedDs = ds.map(record ->
> record.get("value").get("my-field").asText());
>
> It is also possible to implement your own deserialization schema that for
> eg. could turn each record into a POJO. You can do this by implementing the 
> KafkaDeserializationSchema
> (Flink : 1.14-SNAPSHOT API) (apache.org)
> .
> If you are only interested in the value of the Kafka record, you can also
> extend the AbstractDeserializationSchema (Flink : 1.14-SNAPSHOT API)
> (apache.org)
> 
>  and
> use .setValueOnlyDeserializer(new CustomDeserializer()). There is also a
> different API that you could use for this which is specified here: 
> KafkaSourceBuilder
> (Flink : 1.14-SNAPSHOT API) (apache.org)
> .
> Although the customDeserializer will be the same for older Flink versions,
> the Kafka Source has appeared recently, to learn about the previous kafka
> source (FlinkKafkaConsumer) see: Kafka | Apache Flink
> 
> .
>
> Best Regards
> Kamil
>
> On Fri, 14 Jan 2022 at 18:37, HG  wrote:
>
>> Hi,
>>
>> Before starting programming myself I'd like to know whether there are
>> good examples with deserialization of JSON that I can borrow.
>> The structure of the JSON is nested with multiple levels.
>>
>> Any references?
>>
>> 'better well stolen than badly invented myself' we'd say in Dutch
>>
>> Regards Hans
>>
>


Re: How to proper hashCode() for keys.

2022-02-07 Thread Chesnay Schepler
Your Key selector doesn't need to implement hashCode, but given the same 
object it has to return the same key.
In your reproducer the returned key will have different timestamps, and 
since the timestamp is included in the hashCode, they will be different 
each time.


On 07/02/2022 14:50, John Smith wrote:
I don't get it? I provided the reproducer. I implemented the interface 
to Key selector it needs hashcode and equals as well?


I'm attempting to do click stream. So the key is based on processing 
date/time rounded to the minute + domain name + path


So these should be valid below?

2022-01-01T10:02:00 + cnn.com  + /article1
2022-01-01T10:02:00 + cnn.com  + /article1
2022-01-01T10:02:00 + cnn.com  + /article1

2022-01-01T10:02:00 + cnn.com  + /article2

2022-01-01T10:03:00 + cnn.com  + /article1
2022-01-01T10:03:00 + cnn.com  + /article1

2022-01-01T10:03:00 + cnn.com  + /article3
2022-01-01T10:03:00 + cnn.com  + /article3

On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
 wrote:


Don't KeySelectors also need to be deterministic?

* The {@link KeySelector} allows to use deterministic objects for
operations such as reduce, * reduceGroup, join, coGroup, etc. *If
invoked multiple times on the same object, the returned key*
must be the same.*


On 04/02/2022 18:25, John Smith wrote:

Hi Francesco,  here is the reproducer:
https://github.com/javadevmtl/flink-key-reproducer

So, essentially it looks like when there's a high influx of
records produced from the source that the Exception is thrown.

The key is generated by 3 values: date/time rounded to the minute
and 2 strings.
So you will see keys as follows...
2022-02-04T17:20:00Z|foo|bar
2022-02-04T17:21:00Z|foo|bar
2022-02-04T17:22:00Z|foo|bar

The reproducer has a custom source that basically produces a
record in a loop and sleeps for a specified period of
milliseconds 100ms in this case.
The lower the sleep delay the faster records are produced the
more chances the exception is thrown. With a 100ms delay it's
always thrown. Setting a 2000 to 3000ms will guarantee it to work.
The original job uses a Kafka Source so it should technically be
able to handle even a couple thousand records per second.


On Thu, 3 Feb 2022 at 16:41, John Smith 
wrote:

Ok it's not my data either. I think it may be a volume issue.
I have managed to consistently reproduce the error. I'll
upload a reproducer ASAP.



On Thu, 3 Feb 2022 at 15:37, John Smith
 wrote:

Ok so I tried to create a reproducer but I couldn't
reproduce it. But the actual job once in a while throws
that error. So I'm wondering if maybe one of the records
that comes in is not valid, though I do validate prior to
getting to the key and window operators.

On Thu, 3 Feb 2022 at 14:32, John Smith
 wrote:

Actually maybe not because with PrintSinkFunction it
ran for a bit and then it threw the error.

On Thu, 3 Feb 2022 at 14:24, John Smith
 wrote:

Ok it may be the ElasticSearch connector causing
the issue?

If I use PrintSinkFunction then I get no error
and my stats print as expected.

On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani
 wrote:

Hi,
your hash code and equals seems correct. Can
you post a minimum stream pipeline reproducer
using this class?

FG

On Tue, Feb 1, 2022 at 8:39 PM John Smith
 wrote:

Hi, getting
java.lang.IllegalArgumentException: Key
group 39 is not in
KeyGroupRange{startKeyGroup=96,
endKeyGroup=103}. Unless you're directly
using low level state access APIs, this
is most likely caused by
non-deterministic shuffle key (hashCode
and equals implementation).

This is my class, is my hashCode
deterministic?

public final class MyEventCountKey {
 private final StringcountDateTime; private 
final Stringdomain; private final Stringevent; public MyEventCountKey(final 
String countDateTime, final String domain, final String event) {
 this.countDateTime = countDateTime; 
this.domain = domain; 

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
I don't get it? I provided the reproducer. I implemented the interface to
Key selector it needs hashcode and equals as well?

I'm attempting to do click stream. So the key is based on processing
date/time rounded to the minute + domain name + path

So these should be valid below?

2022-01-01T10:02:00 + cnn.com + /article1
2022-01-01T10:02:00 + cnn.com + /article1
2022-01-01T10:02:00 + cnn.com + /article1

2022-01-01T10:02:00 + cnn.com + /article2

2022-01-01T10:03:00 + cnn.com + /article1
2022-01-01T10:03:00 + cnn.com + /article1

2022-01-01T10:03:00 + cnn.com + /article3
2022-01-01T10:03:00 + cnn.com + /article3

On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
wrote:

> Don't KeySelectors also need to be deterministic?
>
> * The {@link KeySelector} allows to use deterministic objects for operations 
> such as reduce,* reduceGroup, join, coGroup, etc. *If invoked multiple times 
> on the same object, the returned key*** must be the same.*
>
>
> On 04/02/2022 18:25, John Smith wrote:
>
> Hi Francesco,  here is the reproducer:
> https://github.com/javadevmtl/flink-key-reproducer
>
> So, essentially it looks like when there's a high influx of records
> produced from the source that the Exception is thrown.
>
> The key is generated by 3 values: date/time rounded to the minute and 2
> strings.
> So you will see keys as follows...
> 2022-02-04T17:20:00Z|foo|bar
> 2022-02-04T17:21:00Z|foo|bar
> 2022-02-04T17:22:00Z|foo|bar
>
> The reproducer has a custom source that basically produces a record in a
> loop and sleeps for a specified period of milliseconds 100ms in this case.
> The lower the sleep delay the faster records are produced the more chances
> the exception is thrown. With a 100ms delay it's always thrown. Setting a
> 2000 to 3000ms will guarantee it to work.
> The original job uses a Kafka Source so it should technically be able to
> handle even a couple thousand records per second.
>
>
> On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:
>
>> Ok it's not my data either. I think it may be a volume issue. I have
>> managed to consistently reproduce the error. I'll upload a reproducer ASAP.
>>
>>
>>
>> On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:
>>
>>> Ok so I tried to create a reproducer but I couldn't reproduce it. But
>>> the actual job once in a while throws that error. So I'm wondering if maybe
>>> one of the records that comes in is not valid, though I do validate prior
>>> to getting to the key and window operators.
>>>
>>> On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:
>>>
 Actually maybe not because with PrintSinkFunction it ran for a bit and
 then it threw the error.

 On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:

> Ok it may be the ElasticSearch connector causing the issue?
>
> If I use PrintSinkFunction then I get no error and my stats print as
> expected.
>
> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> Hi,
>> your hash code and equals seems correct. Can you post a minimum
>> stream pipeline reproducer using this class?
>>
>> FG
>>
>> On Tue, Feb 1, 2022 at 8:39 PM John Smith 
>> wrote:
>>
>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not
>>> in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're 
>>> directly
>>> using low level state access APIs, this is most likely caused by
>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>
>>> This is my class, is my hashCode deterministic?
>>>
>>> public final class MyEventCountKey {
>>> private final String countDateTime;private final String domain; 
>>>private final String event;public MyEventCountKey(final String 
>>> countDateTime, final String domain, final String event) {
>>> this.countDateTime = countDateTime;this.domain = 
>>> domain;this.event = event;}
>>>
>>> public String getCountDateTime() {
>>> return countDateTime;}
>>>
>>> public String getDomain() {
>>> return domain;}
>>>
>>> public String getEven() {
>>> return event;}
>>>
>>> @Overridepublic String toString() {
>>> return countDateTime + "|" + domain + "|" + event;}
>>>
>>> @Overridepublic boolean equals(Object o) {
>>> if (this == o) return true;if (o == null || getClass() 
>>> != o.getClass()) return false;MyEventCountKey that = 
>>> (MyEventCountKey) o;return 
>>> countDateTime.equals(that.countDateTime) &&
>>> domain.equals(that.domain) &&
>>> event.equals(that.event);}
>>>
>>> @Overridepublic int hashCode() {
>>> final int prime = 31;int result = 1;result = 
>>> prime * result + countDateTime.hashCode();

Re: How to prevent check pointing of timers ?

2022-02-07 Thread Alex Drobinsky
By timer I mean regular timer from KeyedState which utilized via function
onTimer, for example:


public class StateWithTimer {
public long timerValue = 0;
public volatile boolean shouldResetTimer = true;

public boolean resetIfMust(long timeoutInMilliseconds,
TimerService timerService) {
if (shouldResetTimer) {
setupTimer(timeoutInMilliseconds, timerService);
shouldResetTimer = false;
return true;
}
return false;
}

public void setupTimer(long timeoutInMilliseconds, TimerService
timerService) {
// Cancel previous timer
timerService.deleteProcessingTimeTimer(timerValue);
// Register new timer
// Should it be configurable ?
timerValue = (timerService.currentProcessingTime() +
timeoutInMilliseconds)*1000/1000;
timerService.registerProcessingTimeTimer(timerValue);
}

}


State which utilizes timers extends StateWithTimer above, the function
resetIfMust is current workaround - it resets timers first time after
restart from checkpoint or start.

@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector collector) throws Exception {
   MultiStorePacketState so = state.value();
   if (so.resetIfMust(StorePacketConfigurationParameters.partAggregationTimeout,
ctx.timerService())) {
  return;
   }
   closeAndReportFile(collector, so);

   ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
   state.update(so);
}





пн, 7 февр. 2022 г. в 05:06, Caizhi Weng :

> Hi!
>
> Could you elaborate more on your code or share it if possible? Which timer
> are you talking about? Are you using the data stream API or SQL API? Do you
> mean the timer registered per record for a window aggregation? Does mini
> batch aggregation [1] solve your problem?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tuning/#minibatch-aggregation
>
> Alex Drobinsky  于2022年2月3日周四 20:41写道:
>
>> Dear flink user,
>>
>> In our project, restoring the timer's state creates numerous issues, so I
>> would like to know
>> if it is possible to avoid save/restore of timers altogether.
>> If it isn't possible, how could I delete all registered timers during the
>> open function ?
>>
>> Best regards,
>> Alexander
>>
>


Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

2022-02-07 Thread HG
Super
Thanks

Op ma 7 feb. 2022 om 13:04 schreef Chesnay Schepler :

> I think you can safely ignore this warning. It shouldn't cause any harm,
> but I will file a ticket nonetheless.
>
> On 07/02/2022 12:52, HG wrote:
>
> I have nothing like that in the config (flink-conf.yaml).
>
> Just downloaded the software and did bin/start-cluster.sh
>
> Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler :
>
>> I meant in the Flink config of the cluster you are submitting the jobs to.
>> Specifically whether classloader.check-leaked-classloader was set to
>> false.
>>
>> On 07/02/2022 10:28, HG wrote:
>>
>> Hi,
>>
>> Well I have set :
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setMaxParallelism(5);
>> env.setRuntimeMode(RuntimeExecutionMode.*STREAMING*);
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.enableCheckpointing(500);
>>
>> On the other hand .setBounded(OffsetsInitializer.latest())
>>
>> Perhaps that is a bit of a conflict?
>> The job should be unbounded anyway.
>> When I cancel it  (the unbounded) via the GUI and start it again I do not 
>> see the same issue.
>>
>> So perhaps not very important.
>> Regards Hans
>>
>>
>> Op ma 7 feb. 2022 om 09:23 schreef Chesnay Schepler :
>>
>>> Have you set anything beyond the defaults in the Flink configuration?
>>>
>>> This could just be noise with some Kafka stuff running in the background
>>> while Flink is shutting things down (and closing the classloader).
>>>
>>> On 04/02/2022 15:29, HG wrote:
>>>
>>> Hi,
>>>
>>> I am developing my flink application.
>>> For start I have built a class that reads events from Kafka and outputs
>>> them datastream.print()
>>>
>>> The job runs every time.
>>> But starting with the 2nd time I see this in the standalone session log:
>>>
>>> 2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils
>>>[] - Failed to close KafkaClient with type
>>> org.apache.kafka.clients.NetworkClient
>>> java.lang.NoClassDefFoundError:
>>> org/apache/kafka/common/network/Selector$CloseMode
>>> at
>>> org.apache.kafka.common.network.Selector.close(Selector.java:806)
>>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>> at
>>> org.apache.kafka.common.network.Selector.close(Selector.java:365)
>>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>> at
>>> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
>>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>> at
>>> org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
>>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>> at
>>> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
>>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>> at java.lang.Thread.run(Thread.java:829) [?:?]
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.kafka.common.network.Selector$CloseMode
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>>> ~[?:?]
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
>>> at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
>>> ... 6 more
>>> 2022-02-04 15:16:30,802 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
>>> coordinator for source Source: Kafka Source -> Sink: Print to Std. Out
>>> closed.
>>>
>>> Am I doing something wrong?
>>>
>>> This is basically the gist of the code:
>>>
>>> KafkaSource source = KafkaSource
>>> .builder()
>>> .setBootstrapServers(brokers)
>>> .setGroupId(groupId)
>>> .setTopics(kafkaInputTopic)
>>> .setValueOnlyDeserializer(new 
>>> SimpleStringSchema())//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)).setStartingOffsets(OffsetsInitializer.earliest())
>>> .setBounded(OffsetsInitializer.latest())
>>> .build();
>>> //withIdleness.duration()//env.fromSource(source, 
>>> WatermarkStrategy.forMonotonousTimestamps(), "Kafka 
>>> Source");DataStream ds = env.fromSource(source, 
>>> WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>>>
>>> ds.print();
>>>
>>>
>>>
>>
>


Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

2022-02-07 Thread Chesnay Schepler
I think you can safely ignore this warning. It shouldn't cause any harm, 
but I will file a ticket nonetheless.


On 07/02/2022 12:52, HG wrote:

I have nothing like that in the config (flink-conf.yaml).

Just downloaded the software and did bin/start-cluster.sh

Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler :

I meant in the Flink config of the cluster you are submitting the
jobs to.
Specifically whether classloader.check-leaked-classloader was set
to false.

On 07/02/2022 10:28, HG wrote:

Hi,

Well I have set :
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setMaxParallelism(5);
env.setRuntimeMode(RuntimeExecutionMode.*STREAMING*);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(500);
On the other hand .setBounded(OffsetsInitializer.latest())
Perhaps that is a bit of a conflict? The job should be unbounded
anyway. When I cancel it (the unbounded) via the GUI and start it
again I do not see the same issue.
So perhaps not very important. Regards Hans


Op ma 7 feb. 2022 om 09:23 schreef Chesnay Schepler
:

Have you set anything beyond the defaults in the Flink
configuration?

This could just be noise with some Kafka stuff running in the
background while Flink is shutting things down (and closing
the classloader).

On 04/02/2022 15:29, HG wrote:

Hi,

I am developing my flink application.
For start I have built a class that reads events from Kafka
and outputs them datastream.print()

The job runs every time.
But starting with the 2nd time I see this in the standalone
session log:

2022-02-04 15:16:30,801 WARN
 org.apache.kafka.common.utils.Utils                  [] -
Failed to close KafkaClient with type
org.apache.kafka.clients.NetworkClient
java.lang.NoClassDefFoundError:
org/apache/kafka/common/network/Selector$CloseMode
        at
org.apache.kafka.common.network.Selector.close(Selector.java:806)

~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at
org.apache.kafka.common.network.Selector.close(Selector.java:365)

~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at
org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)

~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at
org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)

[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at

org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)

[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.network.Selector$CloseMode
        at
java.net.URLClassLoader.findClass(URLClassLoader.java:476)
~[?:?]
        at
java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
        at

org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
        at

org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
        at

org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
        at
java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
        ... 6 more
2022-02-04 15:16:30,802 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator
[] - Source coordinator for source Source: Kafka Source ->
Sink: Print to Std. Out closed.

Am I doing something wrong?

This is basically the gist of the code:

KafkaSource source = KafkaSource
 .builder()
 .setBootstrapServers(brokers)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
 .setValueOnlyDeserializer(new SimpleStringSchema())

//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class))
.setStartingOffsets(OffsetsInitializer.earliest())
 .setBounded(OffsetsInitializer.latest())
 .build();

//withIdleness.duration() //env.fromSource(source,
WatermarkStrategy.forMonotonousTimestamps(), "Kafka 

Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

2022-02-07 Thread HG
I have nothing like that in the config (flink-conf.yaml).

Just downloaded the software and did bin/start-cluster.sh

Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler :

> I meant in the Flink config of the cluster you are submitting the jobs to.
> Specifically whether classloader.check-leaked-classloader was set to
> false.
>
> On 07/02/2022 10:28, HG wrote:
>
> Hi,
>
> Well I have set :
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setMaxParallelism(5);
> env.setRuntimeMode(RuntimeExecutionMode.*STREAMING*);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.enableCheckpointing(500);
>
> On the other hand .setBounded(OffsetsInitializer.latest())
>
> Perhaps that is a bit of a conflict?
> The job should be unbounded anyway.
> When I cancel it  (the unbounded) via the GUI and start it again I do not see 
> the same issue.
>
> So perhaps not very important.
> Regards Hans
>
>
> Op ma 7 feb. 2022 om 09:23 schreef Chesnay Schepler :
>
>> Have you set anything beyond the defaults in the Flink configuration?
>>
>> This could just be noise with some Kafka stuff running in the background
>> while Flink is shutting things down (and closing the classloader).
>>
>> On 04/02/2022 15:29, HG wrote:
>>
>> Hi,
>>
>> I am developing my flink application.
>> For start I have built a class that reads events from Kafka and outputs
>> them datastream.print()
>>
>> The job runs every time.
>> But starting with the 2nd time I see this in the standalone session log:
>>
>> 2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils
>>  [] - Failed to close KafkaClient with type
>> org.apache.kafka.clients.NetworkClient
>> java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/network/Selector$CloseMode
>> at
>> org.apache.kafka.common.network.Selector.close(Selector.java:806)
>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>> at
>> org.apache.kafka.common.network.Selector.close(Selector.java:365)
>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>> at
>> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>> at
>> org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>> at
>> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>> at java.lang.Thread.run(Thread.java:829) [?:?]
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.kafka.common.network.Selector$CloseMode
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>> ~[?:?]
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
>> ... 6 more
>> 2022-02-04 15:16:30,802 INFO
>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
>> coordinator for source Source: Kafka Source -> Sink: Print to Std. Out
>> closed.
>>
>> Am I doing something wrong?
>>
>> This is basically the gist of the code:
>>
>> KafkaSource source = KafkaSource
>> .builder()
>> .setBootstrapServers(brokers)
>> .setGroupId(groupId)
>> .setTopics(kafkaInputTopic)
>> .setValueOnlyDeserializer(new 
>> SimpleStringSchema())//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)).setStartingOffsets(OffsetsInitializer.earliest())
>> .setBounded(OffsetsInitializer.latest())
>> .build();
>> //withIdleness.duration()//env.fromSource(source, 
>> WatermarkStrategy.forMonotonousTimestamps(), "Kafka 
>> Source");DataStream ds = env.fromSource(source, 
>> WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>>
>> ds.print();
>>
>>
>>
>


RE: Re: Job requiring a lot of memory despite using rocksdb state backend

2022-02-07 Thread Salva Alcántara
The exceptions we got were mostly about the connection with some task
managers being lost and the only way in which we could solve this was by
increasing memory. The reason why we increased the managed fraction was
mostly for improving performance (by giving more memory to rocksdb for its
cache). We have also tried giving lower values, such as 0.25, 0.1 and 0.01.
As for the memory consumption, in all cases the job demands all the
available memory in the long run...performance degrades, though, the lower
the value. I guess what we are trying to achieve is to operate with some
visible margin since seeing the job taking all the memory seems dangerous.
When this happens, eventually snapshots start to fail and eventually the
job itself becomes unstable (maybe after some days).

On 2022/02/07 08:10:37 Chesnay Schepler wrote:
> What where the errors you received that caused you to increase the
> amount of memory you provided to RocksDB?
>
> On 05/02/2022 07:12, Salva Alcántara wrote:
> > I have a job running on Flink 1.14.3 (Java 11) that uses rocksdb as
> > the state backend. The problem is that the job requires an amount of
> > memory pretty similar to the overall state size.
> >
> > Indeed, for making it stable (and capable of taking snapshots) this is
> > what I'm using:
> >
> > - 4 TMs with 30 GB of RAM and 7 CPUs
> > - Everything is run on top of Kubernetes on AWS using nodes with 32 GB
> > of RAM and locally attached SSD disks (M5ad instances for what it's
worth)
> >
> > I have these settings in place:
> >
> > ```
> > state.backend: rocksdb
> > state.backend.incremental: 'true'
> > state.backend.rocksdb.localdir: /opt/flink/rocksdb <-- SSD volume (see
> > below)
> > state.backend.rocksdb.memory.managed: 'true'
> > state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
> > taskmanager.memory.managed.fraction: '0.9'
> > taskmanager.memory.framework.off-heap.size: 512mb
> > taskmanager.numberOfTaskSlots: '4' (parallelism: 16)
> > ```
> >
> > Also this:
> >
> > ```
> >   - name: rocksdb-volume
> > volume:
> >   emptyDir:
> > sizeLimit: 100Gi
> >   name: rocksdb-volume
> > volumeMount:
> >   mountPath: /opt/flink/rocksdb
> > ```
> >
> > Which provides plenty of disk space for each task manager. With those
> > settings, the job runs smoothly and in particular there is a
> > relatively big memory margin. Problem is that memory consumption
> > slowly increases and also that with less memory margin snapshots fail.
> > I have tried reducing the number of taskmanagers but I need 4. Same
> > with the amount of RAM, I have tried giving e.g. 16 GB instead of 30
> > GB but same problem. Another setting that has worked for us is using 8
> > TMs each with 16 GB of RAM, but again, this leads to the same amount
> > of memory overall as the current settings. Even with that amount of
> > memory, I can see that memory keeps growing and will probably lead to
> > a bad end...
> >
> > Also, the latest snapshot took around 120 GBs, so as you can see I am
> > using an amount of RAM similar to the size of the total state, which
> > defeats the whole purpose of using a disk-based state backend
> > (rocksdb) plus local SSDs.
> >
> > Is there an effective way of limiting the memory that rocksdb takes
> > (to that available on the running pods)? Nothing I have found/tried
> > out so far has worked. Theoretically, the images I am using have
> > `jemalloc` in place for memory allocation, which should avoid memory
> > fragmentation issues observed with malloc in the past.
> >
> > PS: I posted the issue in SO too:
> >
https://stackoverflow.com/questions/70986020/flink-job-requiring-a-lot-of-memory-despite-using-rocksdb-state-backend
>
>
>


Re: CDC using Query

2022-02-07 Thread Leonard Xu
Hello, mohan

> 1. Does flink have any support to track any missed source Jdbc CDC records ? 

Flink CDC Connector provides Exactly once semantics which means they won’t miss 
records. Tips: The Flink JDBC Connector only 
Scan the database once which can not continuously read CDC stream.

> 2. What is the equivalent of Kafka consumer groups ?

Different database has different CDC mechanism, it’s serverId which used to 
mark a slave for MySQL/MariaDB, it’s slot name for PostgresSQL. 


> 3. Delivering to kafka from flink is not exactly once. Is that right ?

No, both Flink CDC Connector and Flink Kafka Connector provide exactly once 
implementation.

BTW, if your destination is Elasticsearch, the quick start demo[1] may help you.

Best,
Leonard

[1] 
https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/mysql-postgres-tutorial.html


> 
> Thanks
> 
> On Friday, February 4, 2022, mohan radhakrishnan 
> mailto:radhakrishnan.mo...@gmail.com>> wrote:
> Hello,
>So the jdbc source connector is  kafka and transformation is 
> done by flink (flink sql) ? But that connector can miss records. I thought. 
> Started looking at flink for this and other use cases.
> Can I see the alternative to spring cloudstreams( kafka streams )? Since I am 
> learning flink, kafka streams' changelog topics and exactly-once delivery and 
> dlqs seemed good for our cŕitical push notifications.
> 
> We also needed a  elastic  sink.
> 
> Thanks
> 
> On Friday, February 4, 2022, Dawid Wysakowicz  > wrote:
> Hi Mohan,
> 
> I don't know much about Kafka Connect, so I will not talk about its features 
> and differences to Flink. Flink on its own does not have a capability to read 
> a CDC stream directly from a DB. However there is the flink-cdc-connectors[1] 
> projects which embeds the standalone Debezium engine inside of Flink's source 
> and can process DB changelog with all processing guarantees that Flink 
> provides.
> 
> As for the idea of processing further with Kafka Streams. Why not process 
> data with Flink? What do you miss in Flink?
> 
> Best,
> 
> Dawid
> 
> [1] https://github.com/ververica/flink-cdc-connectors 
> 
> 
> On 04/02/2022 13:55, mohan radhakrishnan wrote:
> Hi,
>  When I was looking for CDC I realized Flink uses Kafka Connector to 
> stream to Flink. The idea is to send it forward to Kafka and consume it using 
> Kafka Streams.
> 
> Are there source DLQs or additional mechanisms to detect failures to read 
> from the DB ?
> 
> We don't want to use Debezium and our CDC is based on queries.
> 
> What mechanisms does Flink have that a Kafka Connect worker does not ? Kafka 
> Connect workers can go down and source data can be lost.
> 
> Does the idea  to send it forward to Kafka and consume it using Kafka Streams 
> make sense ? The checkpointing feature of Flink can help ? I plan to use 
> Kafka Streams for 'Exactly-once Delivery' and changelog topics.
> 
> Could you point out relevant material to read ?
> 
> Thanks,
> Mohan



Re: Flink 1.12.1 and KafkaSource

2022-02-07 Thread Chesnay Schepler
First I'd check whether kafkaRequiresSsl is actually true when the job 
is submitted.

(actually just remove the condition and see what happens)

Would this supposed SSL OOM happen only if the client uses SSL?

On 03/02/2022 03:40, Marco Villalobos wrote:
According to the Flink 1.12 documentation 
(https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html), 
it states to use FlinkKafkaSource when consuming from Kafka.


However, I noticed that the newer API uses KafkaSource, which uses 
KafkaSourceBuilder and OffsetsInitializer.


Although I am on the Flink 1.12 codebase, I preemptively decided to 
use KafkaSource instead in order to use the more advanced offsets 
feature. It worked, until I deployed it to EMR and had to connect to 
AWS Kafka (MSK).


The logs show a few suspicious things.

1) The ConsumerConfig logs these properties:

security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

but I actually passed the following:

security.protocol = SSL
ssl.truststore.location = /etc/alternatives/jre/lib/security/cacerts
ssl.truststore.password = changeit
ssl.truststore.type = JKS

2) The job fails and this exception is thrown:

2022-02-03 00:40:57,239 ERROR 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext 
[] - Exception while handling result from async call in 
SourceCoordinator-Source: kafka sensor tags -> Sink: s3 sink. 
Triggering job failover.
org.apache.flink.util.FlinkRuntimeException: Failed to handle 
partition splits change due to
at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:223) 
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) 
[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_312]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_312]

at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
Caused by: java.lang.RuntimeException: Failed to get topic metadata.
at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59) 
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:196) 
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_312]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
~[?:1.8.0_312]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
~[?:1.8.0_312]

... 3 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: 
Call(callName=describeTopics, deadlineMs=1643848916823) timed out at 
9223372036854775807 after 1 attempt(s)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) 
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) 
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) 
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) 
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57) 
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:196) 
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_312]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
~[?:1.8.0_312]
at 

Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

2022-02-07 Thread Chesnay Schepler

Have you set anything beyond the defaults in the Flink configuration?

This could just be noise with some Kafka stuff running in the background 
while Flink is shutting things down (and closing the classloader).


On 04/02/2022 15:29, HG wrote:

Hi,

I am developing my flink application.
For start I have built a class that reads events from Kafka and 
outputs them datastream.print()


The job runs every time.
But starting with the 2nd time I see this in the standalone session log:

2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils  [] 
- Failed to close KafkaClient with type 
org.apache.kafka.clients.NetworkClient
java.lang.NoClassDefFoundError: 
org/apache/kafka/common/network/Selector$CloseMode
        at 
org.apache.kafka.common.network.Selector.close(Selector.java:806) 
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at 
org.apache.kafka.common.network.Selector.close(Selector.java:365) 
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at 
org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639) 
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at 
org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834) 
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219) 
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]

        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.network.Selector$CloseMode
        at java.net.URLClassLoader.findClass(URLClassLoader.java:476) 
~[?:?]

        at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
        at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) 
~[flink-dist_2.12-1.14.2.jar:1.14.2]
        at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) 
~[flink-dist_2.12-1.14.2.jar:1.14.2]
        at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) 
~[flink-dist_2.12-1.14.2.jar:1.14.2]

        at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
        ... 6 more
2022-02-04 15:16:30,802 INFO 
 org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
Source coordinator for source Source: Kafka Source -> Sink: Print to 
Std. Out closed.


Am I doing something wrong?

This is basically the gist of the code:

KafkaSource source = KafkaSource
 .builder()
 .setBootstrapServers(brokers)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
 .setValueOnlyDeserializer(new SimpleStringSchema())
//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)) 
.setStartingOffsets(OffsetsInitializer.earliest())

 .setBounded(OffsetsInitializer.latest())
 .build();

//withIdleness.duration() //env.fromSource(source, 
WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); DataStream ds = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(),"Kafka Source");


ds.print();




Re: Job requiring a lot of memory despite using rocksdb state backend

2022-02-07 Thread Chesnay Schepler
What where the errors you received that caused you to increase the 
amount of memory you provided to RocksDB?


On 05/02/2022 07:12, Salva Alcántara wrote:
I have a job running on Flink 1.14.3 (Java 11) that uses rocksdb as 
the state backend. The problem is that the job requires an amount of 
memory pretty similar to the overall state size.


Indeed, for making it stable (and capable of taking snapshots) this is 
what I'm using:


- 4 TMs with 30 GB of RAM and 7 CPUs
- Everything is run on top of Kubernetes on AWS using nodes with 32 GB 
of RAM and locally attached SSD disks (M5ad instances for what it's worth)


I have these settings in place:

```
state.backend: rocksdb
state.backend.incremental: 'true'
state.backend.rocksdb.localdir: /opt/flink/rocksdb <-- SSD volume (see 
below)

state.backend.rocksdb.memory.managed: 'true'
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
taskmanager.memory.managed.fraction: '0.9'
taskmanager.memory.framework.off-heap.size: 512mb
taskmanager.numberOfTaskSlots: '4' (parallelism: 16)
```

Also this:

```
  - name: rocksdb-volume
    volume:
      emptyDir:
        sizeLimit: 100Gi
      name: rocksdb-volume
    volumeMount:
      mountPath: /opt/flink/rocksdb
```

Which provides plenty of disk space for each task manager. With those 
settings, the job runs smoothly and in particular there is a 
relatively big memory margin. Problem is that memory consumption 
slowly increases and also that with less memory margin snapshots fail. 
I have tried reducing the number of taskmanagers but I need 4. Same 
with the amount of RAM, I have tried giving e.g. 16 GB instead of 30 
GB but same problem. Another setting that has worked for us is using 8 
TMs each with 16 GB of RAM, but again, this leads to the same amount 
of memory overall as the current settings. Even with that amount of 
memory, I can see that memory keeps growing and will probably lead to 
a bad end...


Also, the latest snapshot took around 120 GBs, so as you can see I am 
using an amount of RAM similar to the size of the total state, which 
defeats the whole purpose of using a disk-based state backend 
(rocksdb) plus local SSDs.


Is there an effective way of limiting the memory that rocksdb takes 
(to that available on the running pods)? Nothing I have found/tried 
out so far has worked. Theoretically, the images I am using have 
`jemalloc` in place for memory allocation, which should avoid memory 
fragmentation issues observed with malloc in the past.


PS: I posted the issue in SO too: 
https://stackoverflow.com/questions/70986020/flink-job-requiring-a-lot-of-memory-despite-using-rocksdb-state-backend





Re: Questions about Kryo setRegistrationRequired(false)

2022-02-07 Thread Chesnay Schepler

There isn't any setting to control setRegistrationRequired().

You can however turn Kryo off via ExecutionConfig#disableGenericTypes, 
although this may require changes to your data types.


I'd recommend to file a ticket.

On 04/02/2022 20:12, Shane Bishop wrote:

Hi all,

TL;DR: I am concerned that kryo.setRegistrationRequired(false) in 
Apache Flink might introduce serialization/deserialization 
vulnerabilities, and I want to better understand the security 
implications of its use in Flink.


There is an issue on the Kryo GitHub repo (link 
) regarding type 
registration. The "fix" the Kryo developers made was to 
make setRegistrationRequired(true) the default (comment on GitHub 
issue 
,commit 
with this fix 
 and 
theline in the commit that is the fix 
).


This is not a true fix, as the default can still be overridden. This 
only sets a safe default.


In Flink, the default of true is overridden in the 1.14.3 Flink 
release (seeKryoSerializer.java 
and 
FlinkScalaKryoInstantiator.scala 
).


I am no Flink contributor, so I might be missing safety mechanisms 
that are in place to prevent the Kryo serialization/deserialization 
vulnerability even when registration required is set to false. Are 
there any such safety mechanisms in place?


Is there anything I can do as a user of Flink to protect myself 
against this Kryo vulnerability?


Best regards,
Shane Bishop