Re: How to prevent check pointing of timers ?

2022-02-07 Thread Frank Dekervel
Hello, I guess you already evaluated moving to event time and you were not able ? Because this really seems to be a case for moving to event time timers. I think that would require some effort (including choosing a good watermark strategy) but then would solve all your problems. Frank On

Re: How to prevent check pointing of timers ?

2022-02-07 Thread Alex Drobinsky
Sure :) The problem could be defined as the following: Imagine you have a stream of data , for example, network traffic. This network traffic is keyed by source address / source port / destination address / destination port / protocol type. Every connection could be "completed" in two ways : 1) we

Re: How to prevent check pointing of timers ?

2022-02-07 Thread Yun Tang
Hi Alex, I think the better solution is to know what the problem you have ever met when restoring the timers? Flink does not support to remove state (including timer state) currently. Best Yun Tang From: Alex Drobinsky Sent: Monday, February 7, 2022 21:09 To:

Re: Loading a Hive lookup table data into TM memory takes so long.

2022-02-07 Thread Caizhi Weng
Hi! If Flink is not happy with a large Hive table data Currently it is. Hive lookup table (currently implemented just like a filesystem lookup table) cannot look up values with a specific key, so it has to load all data into memory. Did you mean putting the Hive table data into a Kafka/Kinesis

Re: Questions about Kryo setRegistrationRequired(false)

2022-02-07 Thread Shane Bishop
Thanks Chesnay Schepler. I filed a ticket: https://issues.apache.org/jira/browse/FLINK-25993 My team will try disabling Kyro with ExecutionConfig#disableGenericTypes and see if we need to change our data types or not. Best regards, Shane From: Chesnay Schepler

Re: pyflink datastream job

2022-02-07 Thread nagi data monkey
ah, thanks, those doc pages are what I missed! On Mon, Feb 7, 2022 at 2:48 AM Dian Fu wrote: > The following code snippet should just work: > > ``` > from pyflink.datastream import StreamExecutionEnvironment > env = StreamExecutionEnvironment.get_execution_environment() > ``` > > It works both

Re: Loading a Hive lookup table data into TM memory takes so long.

2022-02-07 Thread Jason Yi
Hi Caizhi, Could you tell me more details about streaming joins that you suggested? Did you mean putting the Hive table data into a Kafka/Kinesis and joining the main stream with the hive table data streaming with a very long watermark? In my use case, the hive table is an account dimension

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
Ok I think Ali's solution makes the most sense to me. I'll try it and let you know. On Mon, Feb 7, 2022 at 11:44 AM Jing Ge wrote: > Hi John, > > your getKey() implementation shows that it is not deterministic, since > calling it with the same click instance multiple times will return >

Re: Reading from Kafka kafkarecorddeserializationschema

2022-02-07 Thread HG
Hi Dawid I am a little bit worried by the code because of the ByteBuffer and the endianness? Do I really need to worry about them and determine them too? Or was it just convenient to use ByteBuffer and the endianness here? Regards Hans public Event deserialize(byte[] message) throws

Re: How to proper hashCode() for keys.

2022-02-07 Thread Jing Ge
Hi John, your getKey() implementation shows that it is not deterministic, since calling it with the same click instance multiple times will return different keys. For example a call at 12:01:59.950 and a call at 12:02:00.050 with the same click instance will return two different keys:

Re: How to proper hashCode() for keys.

2022-02-07 Thread Ali Bahadir Zeybek
Hello John, During the lifecycle of the execution for a given event, the key information is not passed in between different operators, but they are computed based on the given key selector, every time an (keyed)operator sees the event. Therefore, the same event, within the same pipeline, could be

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
Maybe there's a misunderstanding. But basically I want to do clickstream count for a given "url" and for simplicity and accuracy of the count base it on processing time (event time doesn't matter as long as I get a total of clicks at that given processing time) So regardless of the event time. I

Re: How to proper hashCode() for keys.

2022-02-07 Thread David Morávek
> > The key selector works. No it does not ;) It depends on the system time so it's not deterministic (you can get different keys for the very same element). How do you key a count based on the time. I have taken this from samples > online. > This is what the windowing is for. You basically

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
The key selector works. It only causes an issue if there too many keys produced in one shot. For example of 100 "same" keys are produced for that 1 minutes it's ok. But if 101 are produced the error happens. If you look at the reproducer at least that's what's hapenning How do you key a count

Re: Example with JSONKeyValueDeserializationSchema?

2022-02-07 Thread HG
Hello Kamil et all, When I build this code: KafkaSource source = KafkaSource.builder() .setProperties(kafkaProps) .setProperty("ssl.truststore.type",trustStoreType) .setProperty("ssl.truststore.password",trustStorePassword)

Re: How to proper hashCode() for keys.

2022-02-07 Thread Chesnay Schepler
Your Key selector doesn't need to implement hashCode, but given the same object it has to return the same key. In your reproducer the returned key will have different timestamps, and since the timestamp is included in the hashCode, they will be different each time. On 07/02/2022 14:50, John

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
I don't get it? I provided the reproducer. I implemented the interface to Key selector it needs hashcode and equals as well? I'm attempting to do click stream. So the key is based on processing date/time rounded to the minute + domain name + path So these should be valid below?

Re: How to prevent check pointing of timers ?

2022-02-07 Thread Alex Drobinsky
By timer I mean regular timer from KeyedState which utilized via function onTimer, for example: public class StateWithTimer { public long timerValue = 0; public volatile boolean shouldResetTimer = true; public boolean resetIfMust(long timeoutInMilliseconds, TimerService

Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

2022-02-07 Thread HG
Super Thanks Op ma 7 feb. 2022 om 13:04 schreef Chesnay Schepler : > I think you can safely ignore this warning. It shouldn't cause any harm, > but I will file a ticket nonetheless. > > On 07/02/2022 12:52, HG wrote: > > I have nothing like that in the config (flink-conf.yaml). > > Just

Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

2022-02-07 Thread Chesnay Schepler
I think you can safely ignore this warning. It shouldn't cause any harm, but I will file a ticket nonetheless. On 07/02/2022 12:52, HG wrote: I have nothing like that in the config (flink-conf.yaml). Just downloaded the software and did bin/start-cluster.sh Op ma 7 feb. 2022 om 10:52 schreef

Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

2022-02-07 Thread HG
I have nothing like that in the config (flink-conf.yaml). Just downloaded the software and did bin/start-cluster.sh Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler : > I meant in the Flink config of the cluster you are submitting the jobs to. > Specifically whether

RE: Re: Job requiring a lot of memory despite using rocksdb state backend

2022-02-07 Thread Salva Alcántara
The exceptions we got were mostly about the connection with some task managers being lost and the only way in which we could solve this was by increasing memory. The reason why we increased the managed fraction was mostly for improving performance (by giving more memory to rocksdb for its cache).

Re: CDC using Query

2022-02-07 Thread Leonard Xu
Hello, mohan > 1. Does flink have any support to track any missed source Jdbc CDC records ? Flink CDC Connector provides Exactly once semantics which means they won’t miss records. Tips: The Flink JDBC Connector only Scan the database once which can not continuously read CDC stream. > 2.

Re: Flink 1.12.1 and KafkaSource

2022-02-07 Thread Chesnay Schepler
First I'd check whether kafkaRequiresSsl is actually true when the job is submitted. (actually just remove the condition and see what happens) Would this supposed SSL OOM happen only if the client uses SSL? On 03/02/2022 03:40, Marco Villalobos wrote: According to the Flink 1.12 documentation

Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

2022-02-07 Thread Chesnay Schepler
Have you set anything beyond the defaults in the Flink configuration? This could just be noise with some Kafka stuff running in the background while Flink is shutting things down (and closing the classloader). On 04/02/2022 15:29, HG wrote: Hi, I am developing my flink application. For

Re: Job requiring a lot of memory despite using rocksdb state backend

2022-02-07 Thread Chesnay Schepler
What where the errors you received that caused you to increase the amount of memory you provided to RocksDB? On 05/02/2022 07:12, Salva Alcántara wrote: I have a job running on Flink 1.14.3 (Java 11) that uses rocksdb as the state backend. The problem is that the job requires an amount of

Re: Questions about Kryo setRegistrationRequired(false)

2022-02-07 Thread Chesnay Schepler
There isn't any setting to control setRegistrationRequired(). You can however turn Kryo off via ExecutionConfig#disableGenericTypes, although this may require changes to your data types. I'd recommend to file a ticket. On 04/02/2022 20:12, Shane Bishop wrote: Hi all, TL;DR: I am concerned