Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-23 Thread John Smith
Based on these two settings...
taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

Reading the docs here I'm not sure how to calculate the formula. My
suspicion is that I may have allocated too much
of taskmanager.memory.flink.size and the total including MaxDirectMemory is
more than what the physical OS has, is that possible?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size

Are you able to tell me what these numbers come out for this formula
MaxDirectMemory
of TM = Network Memory + Task Off-Heap + Framework Off-heap?

On Thu, May 23, 2024 at 9:01 AM John Smith  wrote:

> Ok, but I still don't get why it's doing it... It's the same version of
> flink... Only difference is java 11 and also I allocated more JVM heap and
> the actual physical is has more ram. Maybe I should reduce the JVM heap by
> a a gigabyte or two?
>
> On Wed, May 22, 2024, 12:37 PM Zhanghao Chen 
> wrote:
>
>> Hi John,
>>
>> A side note here: Flink will set the MaxDirectMemory of TM = Network
>> Memory + Task Off-Heap + Framework Off-heap, and overwrites JVM's default
>> setting, regardless of the version of JVM.
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* John Smith 
>> *Sent:* Wednesday, May 22, 2024 22:56
>> *To:* Biao Geng 
>> *Cc:* user 
>> *Subject:* Re: Would Java 11 cause Getting OutOfMemoryError: Direct
>> buffer memory?
>>
>> Hi, apologies I hit reply instead of reply all. So not sure who saw this
>> or didn't. We have not switched to SSL and also our assumption here
>> would be that if we did switch to SSL the jobs would not work or produce
>> invalid results. The jobs work absolutely fine for a week or so and then
>> they fail.
>>
>> Here is the consumer config from the task logs, which says PLAINTEXT and
>> port 9092 is used. Also I attached a screen of the task manager memory
>> usage. As well I read up on MaxDirectMemory setting of Java 8 vs Java 11.
>> Java 8 by default calculates the direct memory size to 87% of the max heap
>> size. While Java 11 set it to 100% of the max heap size.
>>
>> [image: Screen Shot 2024-05-22 at 9.50.38 AM.png]
>>
>>  allow.auto.create.topics = true
>> auto.commit.interval.ms = 5000
>> auto.offset.reset = latest
>> bootstrap.servers = [xx-kafka-0001:9092, xx-0002:9092,
>> xx-kafka-0003:9092]
>> check.crcs = true
>> client.dns.lookup = default
>> client.id = xx
>> client.rack =
>> connections.max.idle.ms = 54
>> default.api.timeout.ms = 6
>> enable.auto.commit = false
>> exclude.internal.topics = true
>> fetch.max.bytes = 52428800
>> fetch.max.wait.ms = 500
>> fetch.min.bytes = 1
>> group.id = xx
>> group.instance.id = null
>> heartbeat.interval.ms = 3000
>> interceptor.classes = []
>> internal.leave.group.on.close = true
>> isolation.level = read_uncommitted
>> key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> max.partition.fetch.bytes = 1048576
>> max.poll.interval.ms = 30
>> max.poll.records = 500
>> metadata.max.age.ms = 30
>> metric.reporters = []
>> metrics.num.samples = 2
>> metrics.recording.level = INFO
>> metrics.sample.window.ms = 3
>> partition.assignment.strategy = [class
>> org.apache.kafka.clients.consumer.RangeAssignor]
>> receive.buffer.bytes = 65536
>> reconnect.backoff.max.ms = 1000
>> reconnect.backoff.ms = 50
>> request.timeout.ms = 6
>> retry.backoff.ms = 100
>> sasl.client.callback.handler.class = null
>> sasl.jaas.config = null
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> sasl.kerberos.min.time.before.relogin = 6
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> sasl.login.callback.handler.class = null
>> sasl.login.class = null
>> sasl.login.refresh.buffer.seconds = 300
>> sasl.login.refresh.min.period.seconds = 60
>> sasl.login.refresh.window.factor = 0.8
>> sasl.login.refresh.window.jitter = 0.05
>> sasl.mechanism = GSSAPI
>> security.protocol = PLAINTEXT
>> security.providers = null
>> send.buffer.bytes = 131072
>> session.timeout.ms = 1
>> ssl.cipher.suites = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> ssl.endpoint.identification.algorithm = https
>> ssl.key.password = null
>> ssl.keymanager.algorithm = SunX509
>> ssl.keystore.location = null
>> ssl.keystore.password 

Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-16 Thread John Smith
Hi. No I have not changed the protocol.

On Thu, May 16, 2024, 3:20 AM Biao Geng  wrote:

> Hi John,
>
> Just want to check, have you ever changed the kafka protocol in your job
> after using the new cluster? The error message shows that it is caused by
> the kafka client and there is a similar error in this issue
> <https://ververica.zendesk.com/hc/en-us/articles/4413642980498-Direct-buffer-OutOfMemoryError-when-using-Kafka-Connector-in-Flink>
> .
>
> Best,
> Biao Geng
>
>
> John Smith  于2024年5月16日周四 09:01写道:
>
>> I deployed a new cluster, same version as my old cluster(1.14.4 ), only
>> difference using Java 11 and it seems after a week of usage the below
>> exception happens.
>>
>> The task manager is...
>>
>> 32GB total
>>
>> And i have the ONLY following memory settings
>>
>> taskmanager.memory.flink.size: 16384m
>> taskmanager.memory.jvm-metaspace.size: 3072m
>>
>>
>>
>>
>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
>> out-of-memory error has occurred. This can mean two things: either job(s)
>> require(s) a larger size of JVM direct memory or there is a direct memory
>> leak. The direct memory can be allocated by user code or some of its
>> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>> configuration option should be increased. Flink framework and its
>> dependencies also consume the direct memory, mostly for network
>> communication. The most of network memory is managed by Flink and should
>> not result in out-of-memory error. In certain special cases, in particular
>> for jobs with high parallelism, the framework may require more direct
>> memory which is not managed by Flink. In this case
>> 'taskmanager.memory.framework.off-heap.size' configuration option should be
>> increased. If the error persists then there is probably a direct memory
>> leak in user code or some of its dependencies which has to be investigated
>> and fixed. The task executor has to be shutdown...
>> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
>> at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
>> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
>> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
>> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
>> at
>> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
>> at
>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
>> at
>> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>> at
>> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>> at
>> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> ... 1 more
>>
>


Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-15 Thread John Smith
I deployed a new cluster, same version as my old cluster(1.14.4 ), only
difference using Java 11 and it seems after a week of usage the below
exception happens.

The task manager is...

32GB total

And i have the ONLY following memory settings

taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m




Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
out-of-memory error has occurred. This can mean two things: either job(s)
require(s) a larger size of JVM direct memory or there is a direct memory
leak. The direct memory can be allocated by user code or some of its
dependencies. In this case 'taskmanager.memory.task.off-heap.size'
configuration option should be increased. Flink framework and its
dependencies also consume the direct memory, mostly for network
communication. The most of network memory is managed by Flink and should
not result in out-of-memory error. In certain special cases, in particular
for jobs with high parallelism, the framework may require more direct
memory which is not managed by Flink. In this case
'taskmanager.memory.framework.off-heap.size' configuration option should be
increased. If the error persists then there is probably a direct memory
leak in user code or some of its dependencies which has to be investigated
and fixed. The task executor has to be shutdown...
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
at
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more


Re: How to tell if job is being restarted in log?

2023-11-06 Thread John Smith
Ok thanks. :)

On Mon, Nov 6, 2023 at 2:58 AM Junrui Lee  wrote:

> Hi John,
>
> If you want to know more details about why your job is restarting, you can
> search for the keyword "to FAILED" in the JobManager logs. These log
> entries will show you the timing of each restart and the associated
> exception information. Additionally, you can check the exception page to
> find relevant details.
>
> Best,
> Junrui
>
> John Smith  于2023年11月4日周六 09:27写道:
>
>> Hi I'm getting metaspace issues and I understand that certain libraries
>> like JDBC don't unload properly and we need to put them in the global class
>> path of flink.
>>
>> But technically my jobs should not be restarting, so what can I look for
>> in the logs to see when the restart?
>>
>


How to tell if job is being restarted in log?

2023-11-03 Thread John Smith
Hi I'm getting metaspace issues and I understand that certain libraries
like JDBC don't unload properly and we need to put them in the global class
path of flink.

But technically my jobs should not be restarting, so what can I look for in
the logs to see when the restart?


Re: Why is task manager shutting down?

2022-09-29 Thread John Smith
Sorry I mean the 180 seconds. Where does flink decide that 180 seconds is
the cutoff point... And can I increase it.

On Thu., Sep. 29, 2022, 7:02 a.m. John Smith, 
wrote:

> Is there a way to increase the 30 seconds to 60? Where is that 30 second
> timeout set?
>
> I have jdbc query timeout but at some point at night the insert takes a
> bit longer cause of index rebuilding.
>
> On Wed., Sep. 28, 2022, 5:02 a.m. Congxian Qiu, 
> wrote:
>
>> Hi John
>>
>> Yes, the whole TaskManager exited because the task did not react to
>> cancelling signal in time
>>
>> ```
>>
>> 2022-08-30 09:14:22,138 ERROR 
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Task did 
>> not exit gracefully within 180 + seconds.
>> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully 
>> within 180 + seconds.
>>  at 
>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1791)
>>  [flink-dist_2.12-1.14.4.jar:1.14.4]
>>  at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
>> 2022-08-30 09:14:22,139 ERROR 
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal 
>> error occurred while executing the TaskManager. Shutting it down...
>>
>> ```
>>
>>
>>  And the task stack logged such as below when cancelling the sink task
>>
>> ```
>>
>> 2022-08-30 09:14:22,135 WARN  org.apache.flink.runtime.taskmanager.Task  
>>   [] - Task 'Sink: jdbc (1/1)#359' did not react to cancelling 
>> signal - notifying TM; it is stuck for 180 seconds in method:
>>  java.net.SocketInputStream.socketRead0(Native Method)
>> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>> java.net.SocketInputStream.read(SocketInputStream.java:171)
>> java.net.SocketInputStream.read(SocketInputStream.java:141)
>> com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:2023)
>> com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:6418)
>> com.microsoft.sqlserver.jdbc.TDSCommand.startResponse(IOBuffer.java:7579)
>> com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:592)
>> com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:524)
>> com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7194)
>> com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2979)
>> com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:248)
>> com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:223)
>> com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.execute(SQLServerPreparedStatement.java:505)
>> com.xx.common.flink.connectors.jdbc.xxJdbcJsonOutputFormat.flush(xxJdbcJsonOutputFormat.java:111)
>> com.xx.common.flink.connectors.jdbc.xxJdbcJsonSink.snapshotState(xxJdbcJsonSink.java:33)
>> ```
>>
>>
>> Best,
>> Congxian
>>
>>
>> John Smith  于2022年9月23日周五 23:35写道:
>>
>>> Sorry new file:
>>> https://www.dropbox.com/s/mm9521crwvevzgl/flink-flink-taskexecutor-274-flink-prod-v-task-0001.log?dl=0
>>>
>>> On Fri, Sep 23, 2022 at 11:26 AM John Smith 
>>> wrote:
>>>
>>>> Hi I have attached the logs here...
>>>>
>>>>
>>>> https://www.dropbox.com/s/12gwlps52lvxdhz/flink-flink-taskexecutor-274-flink-prod-v-task-0001.log?dl=0
>>>>
>>>> 1- It looks like a timeout issue. Can someone confirm?
>>>> 2- The task manager is restarted, since I have restart on failure in
>>>> SystemD. But it seems after a few restarts it stops. Does it mean that
>>>> SystemD has an internal counter of how many times it will restart a service
>>>> before it doesn't do it anymore?
>>>>
>>>


Re: Why is task manager shutting down?

2022-09-29 Thread John Smith
Is there a way to increase the 30 seconds to 60? Where is that 30 second
timeout set?

I have jdbc query timeout but at some point at night the insert takes a bit
longer cause of index rebuilding.

On Wed., Sep. 28, 2022, 5:02 a.m. Congxian Qiu, 
wrote:

> Hi John
>
> Yes, the whole TaskManager exited because the task did not react to
> cancelling signal in time
>
> ```
>
> 2022-08-30 09:14:22,138 ERROR 
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Task did 
> not exit gracefully within 180 + seconds.
> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully 
> within 180 + seconds.
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1791)
>  [flink-dist_2.12-1.14.4.jar:1.14.4]
>   at java.lang.Thread.run(Thread.java:750) [?:1.8.0_342]
> 2022-08-30 09:14:22,139 ERROR 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal error 
> occurred while executing the TaskManager. Shutting it down...
>
> ```
>
>
>  And the task stack logged such as below when cancelling the sink task
>
> ```
>
> 2022-08-30 09:14:22,135 WARN  org.apache.flink.runtime.taskmanager.Task   
>  [] - Task 'Sink: jdbc (1/1)#359' did not react to cancelling 
> signal - notifying TM; it is stuck for 180 seconds in method:
>  java.net.SocketInputStream.socketRead0(Native Method)
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> java.net.SocketInputStream.read(SocketInputStream.java:171)
> java.net.SocketInputStream.read(SocketInputStream.java:141)
> com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:2023)
> com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:6418)
> com.microsoft.sqlserver.jdbc.TDSCommand.startResponse(IOBuffer.java:7579)
> com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:592)
> com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:524)
> com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7194)
> com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2979)
> com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:248)
> com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:223)
> com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.execute(SQLServerPreparedStatement.java:505)
> com.xx.common.flink.connectors.jdbc.xxJdbcJsonOutputFormat.flush(xxJdbcJsonOutputFormat.java:111)
> com.xx.common.flink.connectors.jdbc.xxJdbcJsonSink.snapshotState(xxJdbcJsonSink.java:33)
> ```
>
>
> Best,
> Congxian
>
>
> John Smith  于2022年9月23日周五 23:35写道:
>
>> Sorry new file:
>> https://www.dropbox.com/s/mm9521crwvevzgl/flink-flink-taskexecutor-274-flink-prod-v-task-0001.log?dl=0
>>
>> On Fri, Sep 23, 2022 at 11:26 AM John Smith 
>> wrote:
>>
>>> Hi I have attached the logs here...
>>>
>>>
>>> https://www.dropbox.com/s/12gwlps52lvxdhz/flink-flink-taskexecutor-274-flink-prod-v-task-0001.log?dl=0
>>>
>>> 1- It looks like a timeout issue. Can someone confirm?
>>> 2- The task manager is restarted, since I have restart on failure in
>>> SystemD. But it seems after a few restarts it stops. Does it mean that
>>> SystemD has an internal counter of how many times it will restart a service
>>> before it doesn't do it anymore?
>>>
>>


Re: Why is task manager shutting down?

2022-09-23 Thread John Smith
Sorry new file:
https://www.dropbox.com/s/mm9521crwvevzgl/flink-flink-taskexecutor-274-flink-prod-v-task-0001.log?dl=0

On Fri, Sep 23, 2022 at 11:26 AM John Smith  wrote:

> Hi I have attached the logs here...
>
>
> https://www.dropbox.com/s/12gwlps52lvxdhz/flink-flink-taskexecutor-274-flink-prod-v-task-0001.log?dl=0
>
> 1- It looks like a timeout issue. Can someone confirm?
> 2- The task manager is restarted, since I have restart on failure in
> SystemD. But it seems after a few restarts it stops. Does it mean that
> SystemD has an internal counter of how many times it will restart a service
> before it doesn't do it anymore?
>


Why is task manager shutting down?

2022-09-23 Thread John Smith
Hi I have attached the logs here...

https://www.dropbox.com/s/12gwlps52lvxdhz/flink-flink-taskexecutor-274-flink-prod-v-task-0001.log?dl=0

1- It looks like a timeout issue. Can someone confirm?
2- The task manager is restarted, since I have restart on failure in
SystemD. But it seems after a few restarts it stops. Does it mean that
SystemD has an internal counter of how many times it will restart a service
before it doesn't do it anymore?


Re: Is there any Natural Language Processing samples for flink?

2022-07-27 Thread John Smith
But we can use some sort of Java/Scala NLP lib within our Fink Jobs I
guess...

On Tue, Jul 26, 2022 at 9:59 PM Yunfeng Zhou 
wrote:

> Hi John,
>
> So far as I know, Flink does not have an official library or sample
> specializing in NLP cases yet. You can refer to Flink ML[1] for machine
> learning samples or Deep Learning on Flink[2] for deep learning samples.
>
> [1] https://github.com/apache/flink-ml
> [2] https://github.com/flink-extended/dl-on-flink
>
> Best,
> Yunfeng
>
> On Tue, Jul 26, 2022 at 10:49 PM John Smith 
> wrote:
>
>> As the title asks... All I see is Spark examples.
>>
>> Thanks
>>
>


Is there any Natural Language Processing samples for flink?

2022-07-26 Thread John Smith
As the title asks... All I see is Spark examples.

Thanks


Re: Task manager shutting down.

2022-05-05 Thread John Smith
Actually what's happening is there's a nightly indexing job. So when we
call the insert it takes longer than the specified checkpoint threshold.
JDBC will hapilly continue waiting for a response from the DB until it's
done. So the checkpoint threshold is reached and the job tries to shut down
and restart, but the job is blocked on the JDBC driver and it's causing all
kinds of crazy exceptions as you see in the logs.

So a stop gap solution was to add setQueryTimeout to a value a bit shorter
than the threshold of the checkpoint. This allows the job to fail
"gracefully" and restart until indexing is done.

1- We can review the indexing policy, if it's required nightly, which just
means that instead of having the job fail every night it will fail only
when the indexing happens.
2- The other is to try to figure out a way to pause the job, maybe through
cron and savepoints. But it seems way overly thought.

On Wed, May 4, 2022 at 1:40 PM Martijn Visser 
wrote:

> Hi John,
>
> In an ideal scenario you would be able to leverage Flink's backpressure
> mechanism. That would effectively slow down the processing until the reason
> for backpressure has been resolved. However, given that indexing happens
> after you've sinked your result, from a Flink perspective, the action is
> completed. Perhaps someone else has a different idea on how to achieve
> this.
>
> Best regards,
>
> Martijn
>
> On Wed, 4 May 2022 at 19:31, John Smith  wrote:
>
>> So I know specifically, it's the indexing and I put setQueryTimeout. So
>> the job fails. And goes into retry. That's fine.
>>
>> But just wondering is there a way to pause the stream at a specified
>> time/checkpoint and then resume after a specified time?
>>
>> On Wed, May 4, 2022 at 10:23 AM Martijn Visser 
>> wrote:
>>
>>> Hi John,
>>>
>>> It is generic, but each database has its own dialect implementation
>>> because they all have their differences unfortunately :)
>>>
>>> I wish I knew how I could help you out here. Perhaps some of the JDBC
>>> maintainers could chip in.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Sun, 1 May 2022 at 04:06, John Smith  wrote:
>>>
>>>> Plus in a way isn't the flink-jdbc connector kinda generic? At least
>>>> the older one didn't seem to be server specific.
>>>>
>>>> On Sat, Apr 30, 2022 at 10:04 PM John Smith 
>>>> wrote:
>>>>
>>>>> Hi Martin, is there anything I need to check for?
>>>>>
>>>>> On Tue, Apr 26, 2022 at 9:50 PM John Smith 
>>>>> wrote:
>>>>>
>>>>>> Yeah based off the flink JDBC output format...
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 26, 2022 at 10:05 AM Martijn Visser <
>>>>>> martijnvis...@apache.org> wrote:
>>>>>>
>>>>>>> Hi John,
>>>>>>>
>>>>>>> Have you built your own JDBC MSSQL source or sink or perhaps a CDC
>>>>>>> driver? Because I'm not aware of a Flink Microsoft SQL Server JDBC 
>>>>>>> driver.
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>> Martijn Visser
>>>>>>> https://twitter.com/MartijnVisser82
>>>>>>> https://github.com/MartijnVisser
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 26 Apr 2022 at 16:01, John Smith 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi running 1.14.4
>>>>>>>>
>>>>>>>> Logs included:
>>>>>>>> https://www.dropbox.com/s/8zjndt5rzd9o80f/flink-flink-taskexecutor-138-task-0002.log?dl=0
>>>>>>>>
>>>>>>>> 1- My task managers shut down with: Terminating TaskManagerRunner
>>>>>>>> with exit code 1.
>>>>>>>> 2- It seems to happen at the same time every day. Which leads me to
>>>>>>>> believe it's our database indexing (See below for reasoning of this).
>>>>>>>> 3- Most of our jobs are ETL from Kafka to SQL Server.
>>>>>>>> 4- We see the following exceptions in the logs:
>>>>>>>>   - Task 'Sink: jdbc (1/1)#10' did not react to cancelling
>>>>>>>> signal - interrupting; it is stuck for 30 seconds in method:
>>>>>>>> ... com.microsoft.sqlserver.jdbc.TDSChannel ...
>>>>>>>>   - Sink: jdbc (1/1)#9 (3aaf6d8a45df6c43198bc8297b42354c)
>>>>>>>> switched from RUNNING to FAILED with failure cause:
>>>>>>>> org.apache.flink.util.FlinkException: Disconnect from JobManager
>>>>>>>> responsible for ...
>>>>>>>> 5- Also seeing this: Failed to close consumer network client with
>>>>>>>> type org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>> org/apache/kafka/common/network/Selector$CloseMode
>>>>>>>>
>>>>>>>> So what I'm guessing is happening is the indexing is blocking the
>>>>>>>> job and the task manager cannot cleanly remove the job and finally 
>>>>>>>> after a
>>>>>>>> while it decides to shut down completely?
>>>>>>>>
>>>>>>>> Is there a way to pause the stream and restart at a later time
>>>>>>>> knowing that this happens always at the same wall clock time? Or maybe
>>>>>>>> allow the JDBC to cleanly shutdown with a timeout?
>>>>>>>>
>>>>>>>>
>>>>>>>>


Re: Task manager shutting down.

2022-05-04 Thread John Smith
So I know specifically, it's the indexing and I put setQueryTimeout. So the
job fails. And goes into retry. That's fine.

But just wondering is there a way to pause the stream at a specified
time/checkpoint and then resume after a specified time?

On Wed, May 4, 2022 at 10:23 AM Martijn Visser 
wrote:

> Hi John,
>
> It is generic, but each database has its own dialect implementation
> because they all have their differences unfortunately :)
>
> I wish I knew how I could help you out here. Perhaps some of the JDBC
> maintainers could chip in.
>
> Best regards,
>
> Martijn
>
> On Sun, 1 May 2022 at 04:06, John Smith  wrote:
>
>> Plus in a way isn't the flink-jdbc connector kinda generic? At least the
>> older one didn't seem to be server specific.
>>
>> On Sat, Apr 30, 2022 at 10:04 PM John Smith 
>> wrote:
>>
>>> Hi Martin, is there anything I need to check for?
>>>
>>> On Tue, Apr 26, 2022 at 9:50 PM John Smith 
>>> wrote:
>>>
>>>> Yeah based off the flink JDBC output format...
>>>>
>>>>
>>>> On Tue, Apr 26, 2022 at 10:05 AM Martijn Visser <
>>>> martijnvis...@apache.org> wrote:
>>>>
>>>>> Hi John,
>>>>>
>>>>> Have you built your own JDBC MSSQL source or sink or perhaps a CDC
>>>>> driver? Because I'm not aware of a Flink Microsoft SQL Server JDBC driver.
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Martijn Visser
>>>>> https://twitter.com/MartijnVisser82
>>>>> https://github.com/MartijnVisser
>>>>>
>>>>>
>>>>> On Tue, 26 Apr 2022 at 16:01, John Smith 
>>>>> wrote:
>>>>>
>>>>>> Hi running 1.14.4
>>>>>>
>>>>>> Logs included:
>>>>>> https://www.dropbox.com/s/8zjndt5rzd9o80f/flink-flink-taskexecutor-138-task-0002.log?dl=0
>>>>>>
>>>>>> 1- My task managers shut down with: Terminating TaskManagerRunner
>>>>>> with exit code 1.
>>>>>> 2- It seems to happen at the same time every day. Which leads me to
>>>>>> believe it's our database indexing (See below for reasoning of this).
>>>>>> 3- Most of our jobs are ETL from Kafka to SQL Server.
>>>>>> 4- We see the following exceptions in the logs:
>>>>>>   - Task 'Sink: jdbc (1/1)#10' did not react to cancelling signal
>>>>>> - interrupting; it is stuck for 30 seconds in method:
>>>>>> ... com.microsoft.sqlserver.jdbc.TDSChannel ...
>>>>>>   - Sink: jdbc (1/1)#9 (3aaf6d8a45df6c43198bc8297b42354c)
>>>>>> switched from RUNNING to FAILED with failure cause:
>>>>>> org.apache.flink.util.FlinkException: Disconnect from JobManager
>>>>>> responsible for ...
>>>>>> 5- Also seeing this: Failed to close consumer network client with
>>>>>> type org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
>>>>>> java.lang.NoClassDefFoundError:
>>>>>> org/apache/kafka/common/network/Selector$CloseMode
>>>>>>
>>>>>> So what I'm guessing is happening is the indexing is blocking the job
>>>>>> and the task manager cannot cleanly remove the job and finally after a
>>>>>> while it decides to shut down completely?
>>>>>>
>>>>>> Is there a way to pause the stream and restart at a later time
>>>>>> knowing that this happens always at the same wall clock time? Or maybe
>>>>>> allow the JDBC to cleanly shutdown with a timeout?
>>>>>>
>>>>>>
>>>>>>


Re: How to debug Metaspace exception?

2022-05-02 Thread John Smith
Ok, I don't think I'm running user code on the job manager. Basically. I'm
running a standalone cluster.

3 zookeepers
3 job managers
3 task managers.

I submit my jobs via the UI.

But in case I'll copy the config iver to the job managers.



On Mon, May 2, 2022 at 11:00 AM Chesnay Schepler  wrote:

> There are cases where user-code is run on the JobManager.
> I'm not sure whether though that applies to the JDBC sources.
>
> On 02/05/2022 15:45, John Smith wrote:
>
> Why do the JDBC jars need to be on the job manager node though?
>
> On Mon, May 2, 2022 at 9:36 AM Chesnay Schepler 
> wrote:
>
>> yes.
>> But if you can ensure that the driver isn't bundled by any user-jar you
>> can also skip the pattern configuration step.
>>
>> The pattern looks correct formatting-wise; you could try whether
>> com.microsoft.sqlserver.jdbc. is enough to solve the issue.
>>
>> On 02/05/2022 14:41, John Smith wrote:
>>
>> Oh, so I should copy the jars to the lib folder and
>> set classloader.parent-first-patterns.additional:
>> "org.apache.ignite.;com.microsoft.sqlserver.jdbc." to both the task
>> managers and job managers?
>>
>> Also is my pattern correct?
>> "org.apache.ignite.;com.microsoft.sqlserver.jdbc."
>>
>> Just to be sure I'm running a standalone cluster using zookeeper. So I
>> have 3 zookeepers, 3 job managers and 3 task managers.
>>
>>
>> On Mon, May 2, 2022 at 2:57 AM Chesnay Schepler 
>> wrote:
>>
>>> And you do should make sure that it is set for both processes!
>>>
>>> On 02/05/2022 08:43, Chesnay Schepler wrote:
>>>
>>> The setting itself isn't taskmanager specific; it applies to both the
>>> job- and taskmanager process.
>>>
>>> On 02/05/2022 05:29, John Smith wrote:
>>>
>>> Also just to be sure this is a Task Manager setting right?
>>>
>>> On Thu, Apr 28, 2022 at 11:13 AM John Smith 
>>> wrote:
>>>
>>>> I assume you will take action on your side to track and fix the doc? :)
>>>>
>>>> On Thu, Apr 28, 2022 at 11:12 AM John Smith 
>>>> wrote:
>>>>
>>>>> Ok so to summarize...
>>>>>
>>>>> - Build my job jar and have the JDBC driver as a compile only
>>>>> dependency and copy the JDBC driver to flink lib folder.
>>>>>
>>>>> Or
>>>>>
>>>>> - Build my job jar and include JDBC driver in the shadow, plus copy
>>>>> the JDBC driver in the flink lib folder, plus  make an entry in config for
>>>>> classloader.parent-first-patterns-additional
>>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>>>>
>>>>>
>>>>> On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler 
>>>>> wrote:
>>>>>
>>>>>> I think what I meant was "either add it to /lib, or [if it is already
>>>>>> in /lib but also bundled in the jar] add it to the parent-first 
>>>>>> patterns."
>>>>>>
>>>>>> On 28/04/2022 15:56, Chesnay Schepler wrote:
>>>>>>
>>>>>> Pretty sure, even though I seemingly documented it incorrectly :)
>>>>>>
>>>>>> On 28/04/2022 15:49, John Smith wrote:
>>>>>>
>>>>>> You sure?
>>>>>>
>>>>>>-
>>>>>>
>>>>>>*JDBC*: JDBC drivers leak references outside the user code
>>>>>>classloader. To ensure that these classes are only loaded once you 
>>>>>> should
>>>>>>either add the driver jars to Flink’s lib/ folder, or add the
>>>>>>driver classes to the list of parent-first loaded class via
>>>>>>classloader.parent-first-patterns-additional
>>>>>>
>>>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>>>>>.
>>>>>>
>>>>>>It says either or
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler 
>>>>>> wrote:
>>>>>>
>>>>>>> You're misinterpreting the docs.
>>>>>>>
>>>>>>> The parent/child-first classloading controls where F

Re: How to debug Metaspace exception?

2022-05-02 Thread John Smith
Why do the JDBC jars need to be on the job manager node though?

On Mon, May 2, 2022 at 9:36 AM Chesnay Schepler  wrote:

> yes.
> But if you can ensure that the driver isn't bundled by any user-jar you
> can also skip the pattern configuration step.
>
> The pattern looks correct formatting-wise; you could try whether
> com.microsoft.sqlserver.jdbc. is enough to solve the issue.
>
> On 02/05/2022 14:41, John Smith wrote:
>
> Oh, so I should copy the jars to the lib folder and
> set classloader.parent-first-patterns.additional:
> "org.apache.ignite.;com.microsoft.sqlserver.jdbc." to both the task
> managers and job managers?
>
> Also is my pattern correct?
> "org.apache.ignite.;com.microsoft.sqlserver.jdbc."
>
> Just to be sure I'm running a standalone cluster using zookeeper. So I
> have 3 zookeepers, 3 job managers and 3 task managers.
>
>
> On Mon, May 2, 2022 at 2:57 AM Chesnay Schepler 
> wrote:
>
>> And you do should make sure that it is set for both processes!
>>
>> On 02/05/2022 08:43, Chesnay Schepler wrote:
>>
>> The setting itself isn't taskmanager specific; it applies to both the
>> job- and taskmanager process.
>>
>> On 02/05/2022 05:29, John Smith wrote:
>>
>> Also just to be sure this is a Task Manager setting right?
>>
>> On Thu, Apr 28, 2022 at 11:13 AM John Smith 
>> wrote:
>>
>>> I assume you will take action on your side to track and fix the doc? :)
>>>
>>> On Thu, Apr 28, 2022 at 11:12 AM John Smith 
>>> wrote:
>>>
>>>> Ok so to summarize...
>>>>
>>>> - Build my job jar and have the JDBC driver as a compile only
>>>> dependency and copy the JDBC driver to flink lib folder.
>>>>
>>>> Or
>>>>
>>>> - Build my job jar and include JDBC driver in the shadow, plus copy the
>>>> JDBC driver in the flink lib folder, plus  make an entry in config for
>>>> classloader.parent-first-patterns-additional
>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>>>
>>>>
>>>> On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler 
>>>> wrote:
>>>>
>>>>> I think what I meant was "either add it to /lib, or [if it is already
>>>>> in /lib but also bundled in the jar] add it to the parent-first patterns."
>>>>>
>>>>> On 28/04/2022 15:56, Chesnay Schepler wrote:
>>>>>
>>>>> Pretty sure, even though I seemingly documented it incorrectly :)
>>>>>
>>>>> On 28/04/2022 15:49, John Smith wrote:
>>>>>
>>>>> You sure?
>>>>>
>>>>>-
>>>>>
>>>>>*JDBC*: JDBC drivers leak references outside the user code
>>>>>classloader. To ensure that these classes are only loaded once you 
>>>>> should
>>>>>either add the driver jars to Flink’s lib/ folder, or add the
>>>>>driver classes to the list of parent-first loaded class via
>>>>>classloader.parent-first-patterns-additional
>>>>>
>>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>>>>.
>>>>>
>>>>>It says either or
>>>>>
>>>>>
>>>>> On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler 
>>>>> wrote:
>>>>>
>>>>>> You're misinterpreting the docs.
>>>>>>
>>>>>> The parent/child-first classloading controls where Flink looks for a
>>>>>> class *first*, specifically whether we first load from /lib or the
>>>>>> user-jar.
>>>>>> It does not allow you to load something from the user-jar in the
>>>>>> parent classloader. That's just not how it works.
>>>>>>
>>>>>> It must be in /lib.
>>>>>>
>>>>>> On 27/04/2022 04:59, John Smith wrote:
>>>>>>
>>>>>> Hi Chesnay as per the docs...
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>>>>>>
>>>>>> You can either put the jars in task manager lib folder or use
>>>>>> classloader.parent-first-patterns-additional
>>>>>> <https://nightlies.apa

Re: How to debug Metaspace exception?

2022-05-02 Thread John Smith
Oh, so I should copy the jars to the lib folder and
set classloader.parent-first-patterns.additional:
"org.apache.ignite.;com.microsoft.sqlserver.jdbc." to both the task
managers and job managers?

Also is my pattern correct?
"org.apache.ignite.;com.microsoft.sqlserver.jdbc."

Just to be sure I'm running a standalone cluster using zookeeper. So I have
3 zookeepers, 3 job managers and 3 task managers.


On Mon, May 2, 2022 at 2:57 AM Chesnay Schepler  wrote:

> And you do should make sure that it is set for both processes!
>
> On 02/05/2022 08:43, Chesnay Schepler wrote:
>
> The setting itself isn't taskmanager specific; it applies to both the job-
> and taskmanager process.
>
> On 02/05/2022 05:29, John Smith wrote:
>
> Also just to be sure this is a Task Manager setting right?
>
> On Thu, Apr 28, 2022 at 11:13 AM John Smith 
> wrote:
>
>> I assume you will take action on your side to track and fix the doc? :)
>>
>> On Thu, Apr 28, 2022 at 11:12 AM John Smith 
>> wrote:
>>
>>> Ok so to summarize...
>>>
>>> - Build my job jar and have the JDBC driver as a compile only
>>> dependency and copy the JDBC driver to flink lib folder.
>>>
>>> Or
>>>
>>> - Build my job jar and include JDBC driver in the shadow, plus copy the
>>> JDBC driver in the flink lib folder, plus  make an entry in config for
>>> classloader.parent-first-patterns-additional
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>>
>>>
>>> On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler 
>>> wrote:
>>>
>>>> I think what I meant was "either add it to /lib, or [if it is already
>>>> in /lib but also bundled in the jar] add it to the parent-first patterns."
>>>>
>>>> On 28/04/2022 15:56, Chesnay Schepler wrote:
>>>>
>>>> Pretty sure, even though I seemingly documented it incorrectly :)
>>>>
>>>> On 28/04/2022 15:49, John Smith wrote:
>>>>
>>>> You sure?
>>>>
>>>>-
>>>>
>>>>*JDBC*: JDBC drivers leak references outside the user code
>>>>classloader. To ensure that these classes are only loaded once you 
>>>> should
>>>>either add the driver jars to Flink’s lib/ folder, or add the
>>>>driver classes to the list of parent-first loaded class via
>>>>classloader.parent-first-patterns-additional
>>>>
>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>>>    .
>>>>
>>>>It says either or
>>>>
>>>>
>>>> On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler 
>>>> wrote:
>>>>
>>>>> You're misinterpreting the docs.
>>>>>
>>>>> The parent/child-first classloading controls where Flink looks for a
>>>>> class *first*, specifically whether we first load from /lib or the
>>>>> user-jar.
>>>>> It does not allow you to load something from the user-jar in the
>>>>> parent classloader. That's just not how it works.
>>>>>
>>>>> It must be in /lib.
>>>>>
>>>>> On 27/04/2022 04:59, John Smith wrote:
>>>>>
>>>>> Hi Chesnay as per the docs...
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>>>>>
>>>>> You can either put the jars in task manager lib folder or use
>>>>> classloader.parent-first-patterns-additional
>>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>>>>
>>>>> I prefer the latter like this: the dependency stays with the user-jar
>>>>> and not on the task manager.
>>>>>
>>>>> On Tue, Apr 26, 2022 at 9:52 PM John Smith 
>>>>> wrote:
>>>>>
>>>>>> Ok so I should put the Apache ignite and my Microsoft drivers in the
>>>>>> lib folders of my task managers?
>>>>>>
>>>>>> And then in my job jar only include them as compile time
>>>>>> dependencies?
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler 
>>>>>> wro

Re: How to debug Metaspace exception?

2022-05-01 Thread John Smith
Also just to be sure this is a Task Manager setting right?

On Thu, Apr 28, 2022 at 11:13 AM John Smith  wrote:

> I assume you will take action on your side to track and fix the doc? :)
>
> On Thu, Apr 28, 2022 at 11:12 AM John Smith 
> wrote:
>
>> Ok so to summarize...
>>
>> - Build my job jar and have the JDBC driver as a compile only
>> dependency and copy the JDBC driver to flink lib folder.
>>
>> Or
>>
>> - Build my job jar and include JDBC driver in the shadow, plus copy the
>> JDBC driver in the flink lib folder, plus  make an entry in config for
>> classloader.parent-first-patterns-additional
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>
>>
>> On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler 
>> wrote:
>>
>>> I think what I meant was "either add it to /lib, or [if it is already in
>>> /lib but also bundled in the jar] add it to the parent-first patterns."
>>>
>>> On 28/04/2022 15:56, Chesnay Schepler wrote:
>>>
>>> Pretty sure, even though I seemingly documented it incorrectly :)
>>>
>>> On 28/04/2022 15:49, John Smith wrote:
>>>
>>> You sure?
>>>
>>>-
>>>
>>>*JDBC*: JDBC drivers leak references outside the user code
>>>classloader. To ensure that these classes are only loaded once you should
>>>either add the driver jars to Flink’s lib/ folder, or add the driver
>>>classes to the list of parent-first loaded class via
>>>classloader.parent-first-patterns-additional
>>>
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>>.
>>>
>>>It says either or
>>>
>>>
>>> On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler 
>>> wrote:
>>>
>>>> You're misinterpreting the docs.
>>>>
>>>> The parent/child-first classloading controls where Flink looks for a
>>>> class *first*, specifically whether we first load from /lib or the
>>>> user-jar.
>>>> It does not allow you to load something from the user-jar in the parent
>>>> classloader. That's just not how it works.
>>>>
>>>> It must be in /lib.
>>>>
>>>> On 27/04/2022 04:59, John Smith wrote:
>>>>
>>>> Hi Chesnay as per the docs...
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>>>>
>>>> You can either put the jars in task manager lib folder or use
>>>> classloader.parent-first-patterns-additional
>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>>>
>>>> I prefer the latter like this: the dependency stays with the user-jar
>>>> and not on the task manager.
>>>>
>>>> On Tue, Apr 26, 2022 at 9:52 PM John Smith 
>>>> wrote:
>>>>
>>>>> Ok so I should put the Apache ignite and my Microsoft drivers in the
>>>>> lib folders of my task managers?
>>>>>
>>>>> And then in my job jar only include them as compile time dependencies?
>>>>>
>>>>>
>>>>> On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler 
>>>>> wrote:
>>>>>
>>>>>> JDBC drivers are well-known for leaking classloaders unfortunately.
>>>>>>
>>>>>> You have correctly identified your alternatives.
>>>>>>
>>>>>> You must put the jdbc driver into /lib instead. Setting only the
>>>>>> parent-first pattern shouldn't affect anything.
>>>>>> That is only relevant if something is in both in /lib and the
>>>>>> user-jar, telling Flink to prioritize what is in lib.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 26/04/2022 15:35, John Smith wrote:
>>>>>>
>>>>>> So I put classloader.parent-first-patterns.additional:
>>>>>> "org.apache.ignite." in the task config and so far I don't think I'm
>>>>>> getting "java.lang.OutOfMemoryError: Metaspace" any more.
>>>>>>
>>>>>> Or it's too early to tell.
>>>>>>
>>>>>> Though now, the tas

Re: Task manager shutting down.

2022-04-30 Thread John Smith
Plus in a way isn't the flink-jdbc connector kinda generic? At least the
older one didn't seem to be server specific.

On Sat, Apr 30, 2022 at 10:04 PM John Smith  wrote:

> Hi Martin, is there anything I need to check for?
>
> On Tue, Apr 26, 2022 at 9:50 PM John Smith  wrote:
>
>> Yeah based off the flink JDBC output format...
>>
>>
>> On Tue, Apr 26, 2022 at 10:05 AM Martijn Visser 
>> wrote:
>>
>>> Hi John,
>>>
>>> Have you built your own JDBC MSSQL source or sink or perhaps a CDC
>>> driver? Because I'm not aware of a Flink Microsoft SQL Server JDBC driver.
>>>
>>> Best regards,
>>>
>>> Martijn Visser
>>> https://twitter.com/MartijnVisser82
>>> https://github.com/MartijnVisser
>>>
>>>
>>> On Tue, 26 Apr 2022 at 16:01, John Smith  wrote:
>>>
>>>> Hi running 1.14.4
>>>>
>>>> Logs included:
>>>> https://www.dropbox.com/s/8zjndt5rzd9o80f/flink-flink-taskexecutor-138-task-0002.log?dl=0
>>>>
>>>> 1- My task managers shut down with: Terminating TaskManagerRunner with
>>>> exit code 1.
>>>> 2- It seems to happen at the same time every day. Which leads me to
>>>> believe it's our database indexing (See below for reasoning of this).
>>>> 3- Most of our jobs are ETL from Kafka to SQL Server.
>>>> 4- We see the following exceptions in the logs:
>>>>   - Task 'Sink: jdbc (1/1)#10' did not react to cancelling signal -
>>>> interrupting; it is stuck for 30 seconds in method:
>>>> ... com.microsoft.sqlserver.jdbc.TDSChannel ...
>>>>   - Sink: jdbc (1/1)#9 (3aaf6d8a45df6c43198bc8297b42354c) switched
>>>> from RUNNING to FAILED with failure cause:
>>>> org.apache.flink.util.FlinkException: Disconnect from JobManager
>>>> responsible for ...
>>>> 5- Also seeing this: Failed to close consumer network client with type
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
>>>> java.lang.NoClassDefFoundError:
>>>> org/apache/kafka/common/network/Selector$CloseMode
>>>>
>>>> So what I'm guessing is happening is the indexing is blocking the job
>>>> and the task manager cannot cleanly remove the job and finally after a
>>>> while it decides to shut down completely?
>>>>
>>>> Is there a way to pause the stream and restart at a later time knowing
>>>> that this happens always at the same wall clock time? Or maybe allow the
>>>> JDBC to cleanly shutdown with a timeout?
>>>>
>>>>
>>>>


Re: Task manager shutting down.

2022-04-30 Thread John Smith
Hi Martin, is there anything I need to check for?

On Tue, Apr 26, 2022 at 9:50 PM John Smith  wrote:

> Yeah based off the flink JDBC output format...
>
>
> On Tue, Apr 26, 2022 at 10:05 AM Martijn Visser 
> wrote:
>
>> Hi John,
>>
>> Have you built your own JDBC MSSQL source or sink or perhaps a CDC
>> driver? Because I'm not aware of a Flink Microsoft SQL Server JDBC driver.
>>
>> Best regards,
>>
>> Martijn Visser
>> https://twitter.com/MartijnVisser82
>> https://github.com/MartijnVisser
>>
>>
>> On Tue, 26 Apr 2022 at 16:01, John Smith  wrote:
>>
>>> Hi running 1.14.4
>>>
>>> Logs included:
>>> https://www.dropbox.com/s/8zjndt5rzd9o80f/flink-flink-taskexecutor-138-task-0002.log?dl=0
>>>
>>> 1- My task managers shut down with: Terminating TaskManagerRunner with
>>> exit code 1.
>>> 2- It seems to happen at the same time every day. Which leads me to
>>> believe it's our database indexing (See below for reasoning of this).
>>> 3- Most of our jobs are ETL from Kafka to SQL Server.
>>> 4- We see the following exceptions in the logs:
>>>   - Task 'Sink: jdbc (1/1)#10' did not react to cancelling signal -
>>> interrupting; it is stuck for 30 seconds in method:
>>> ... com.microsoft.sqlserver.jdbc.TDSChannel ...
>>>   - Sink: jdbc (1/1)#9 (3aaf6d8a45df6c43198bc8297b42354c) switched
>>> from RUNNING to FAILED with failure cause:
>>> org.apache.flink.util.FlinkException: Disconnect from JobManager
>>> responsible for ...
>>> 5- Also seeing this: Failed to close consumer network client with type
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
>>> java.lang.NoClassDefFoundError:
>>> org/apache/kafka/common/network/Selector$CloseMode
>>>
>>> So what I'm guessing is happening is the indexing is blocking the job
>>> and the task manager cannot cleanly remove the job and finally after a
>>> while it decides to shut down completely?
>>>
>>> Is there a way to pause the stream and restart at a later time knowing
>>> that this happens always at the same wall clock time? Or maybe allow the
>>> JDBC to cleanly shutdown with a timeout?
>>>
>>>
>>>


Re: How to debug Metaspace exception?

2022-04-28 Thread John Smith
I assume you will take action on your side to track and fix the doc? :)

On Thu, Apr 28, 2022 at 11:12 AM John Smith  wrote:

> Ok so to summarize...
>
> - Build my job jar and have the JDBC driver as a compile only
> dependency and copy the JDBC driver to flink lib folder.
>
> Or
>
> - Build my job jar and include JDBC driver in the shadow, plus copy the
> JDBC driver in the flink lib folder, plus  make an entry in config for
> classloader.parent-first-patterns-additional
> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>
>
> On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler 
> wrote:
>
>> I think what I meant was "either add it to /lib, or [if it is already in
>> /lib but also bundled in the jar] add it to the parent-first patterns."
>>
>> On 28/04/2022 15:56, Chesnay Schepler wrote:
>>
>> Pretty sure, even though I seemingly documented it incorrectly :)
>>
>> On 28/04/2022 15:49, John Smith wrote:
>>
>> You sure?
>>
>>-
>>
>>*JDBC*: JDBC drivers leak references outside the user code
>>classloader. To ensure that these classes are only loaded once you should
>>either add the driver jars to Flink’s lib/ folder, or add the driver
>>classes to the list of parent-first loaded class via
>>classloader.parent-first-patterns-additional
>>
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>.
>>
>>It says either or
>>
>>
>> On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler 
>> wrote:
>>
>>> You're misinterpreting the docs.
>>>
>>> The parent/child-first classloading controls where Flink looks for a
>>> class *first*, specifically whether we first load from /lib or the
>>> user-jar.
>>> It does not allow you to load something from the user-jar in the parent
>>> classloader. That's just not how it works.
>>>
>>> It must be in /lib.
>>>
>>> On 27/04/2022 04:59, John Smith wrote:
>>>
>>> Hi Chesnay as per the docs...
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>>>
>>> You can either put the jars in task manager lib folder or use
>>> classloader.parent-first-patterns-additional
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>>
>>> I prefer the latter like this: the dependency stays with the user-jar
>>> and not on the task manager.
>>>
>>> On Tue, Apr 26, 2022 at 9:52 PM John Smith 
>>> wrote:
>>>
>>>> Ok so I should put the Apache ignite and my Microsoft drivers in the
>>>> lib folders of my task managers?
>>>>
>>>> And then in my job jar only include them as compile time dependencies?
>>>>
>>>>
>>>> On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler 
>>>> wrote:
>>>>
>>>>> JDBC drivers are well-known for leaking classloaders unfortunately.
>>>>>
>>>>> You have correctly identified your alternatives.
>>>>>
>>>>> You must put the jdbc driver into /lib instead. Setting only the
>>>>> parent-first pattern shouldn't affect anything.
>>>>> That is only relevant if something is in both in /lib and the
>>>>> user-jar, telling Flink to prioritize what is in lib.
>>>>>
>>>>>
>>>>>
>>>>> On 26/04/2022 15:35, John Smith wrote:
>>>>>
>>>>> So I put classloader.parent-first-patterns.additional:
>>>>> "org.apache.ignite." in the task config and so far I don't think I'm
>>>>> getting "java.lang.OutOfMemoryError: Metaspace" any more.
>>>>>
>>>>> Or it's too early to tell.
>>>>>
>>>>> Though now, the task managers are shutting down due to some
>>>>> other failures.
>>>>>
>>>>> So maybe because tasks were failing and reloading often the task
>>>>> manager was running out of Metspace. But now maybe it's just
>>>>> cleanly shutting down.
>>>>>
>>>>> On Wed, Apr 20, 2022 at 11:35 AM John Smith 
>>>>> wrote:
>>>>>
>>>>>> Or I can put in the config to treat org.apache.ignite. classes

Re: How to debug Metaspace exception?

2022-04-28 Thread John Smith
Ok so to summarize...

- Build my job jar and have the JDBC driver as a compile only
dependency and copy the JDBC driver to flink lib folder.

Or

- Build my job jar and include JDBC driver in the shadow, plus copy the
JDBC driver in the flink lib folder, plus  make an entry in config for
classloader.parent-first-patterns-additional
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>


On Thu, Apr 28, 2022 at 10:17 AM Chesnay Schepler 
wrote:

> I think what I meant was "either add it to /lib, or [if it is already in
> /lib but also bundled in the jar] add it to the parent-first patterns."
>
> On 28/04/2022 15:56, Chesnay Schepler wrote:
>
> Pretty sure, even though I seemingly documented it incorrectly :)
>
> On 28/04/2022 15:49, John Smith wrote:
>
> You sure?
>
>-
>
>*JDBC*: JDBC drivers leak references outside the user code
>classloader. To ensure that these classes are only loaded once you should
>either add the driver jars to Flink’s lib/ folder, or add the driver
>classes to the list of parent-first loaded class via
>classloader.parent-first-patterns-additional
>
> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>.
>
>It says either or
>
>
> On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler 
> wrote:
>
>> You're misinterpreting the docs.
>>
>> The parent/child-first classloading controls where Flink looks for a
>> class *first*, specifically whether we first load from /lib or the
>> user-jar.
>> It does not allow you to load something from the user-jar in the parent
>> classloader. That's just not how it works.
>>
>> It must be in /lib.
>>
>> On 27/04/2022 04:59, John Smith wrote:
>>
>> Hi Chesnay as per the docs...
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>>
>> You can either put the jars in task manager lib folder or use
>> classloader.parent-first-patterns-additional
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>>
>> I prefer the latter like this: the dependency stays with the user-jar and
>> not on the task manager.
>>
>> On Tue, Apr 26, 2022 at 9:52 PM John Smith 
>> wrote:
>>
>>> Ok so I should put the Apache ignite and my Microsoft drivers in the lib
>>> folders of my task managers?
>>>
>>> And then in my job jar only include them as compile time dependencies?
>>>
>>>
>>> On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler 
>>> wrote:
>>>
>>>> JDBC drivers are well-known for leaking classloaders unfortunately.
>>>>
>>>> You have correctly identified your alternatives.
>>>>
>>>> You must put the jdbc driver into /lib instead. Setting only the
>>>> parent-first pattern shouldn't affect anything.
>>>> That is only relevant if something is in both in /lib and the user-jar,
>>>> telling Flink to prioritize what is in lib.
>>>>
>>>>
>>>>
>>>> On 26/04/2022 15:35, John Smith wrote:
>>>>
>>>> So I put classloader.parent-first-patterns.additional:
>>>> "org.apache.ignite." in the task config and so far I don't think I'm
>>>> getting "java.lang.OutOfMemoryError: Metaspace" any more.
>>>>
>>>> Or it's too early to tell.
>>>>
>>>> Though now, the task managers are shutting down due to some
>>>> other failures.
>>>>
>>>> So maybe because tasks were failing and reloading often the task
>>>> manager was running out of Metspace. But now maybe it's just
>>>> cleanly shutting down.
>>>>
>>>> On Wed, Apr 20, 2022 at 11:35 AM John Smith 
>>>> wrote:
>>>>
>>>>> Or I can put in the config to treat org.apache.ignite. classes as
>>>>> first class?
>>>>>
>>>>> On Tue, Apr 19, 2022 at 10:18 PM John Smith 
>>>>> wrote:
>>>>>
>>>>>> Ok, so I loaded the dump into Eclipse Mat and followed:
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>>>>>
>>>>>> - On the Histogram, I got over 30 entries for: ChildFirstClassLoader
>>>>>> - Then I clicked on one of them "Merge Shortest P

Re: How to debug Metaspace exception?

2022-04-28 Thread John Smith
You sure?

   -

   *JDBC*: JDBC drivers leak references outside the user code classloader.
   To ensure that these classes are only loaded once you should either add the
   driver jars to Flink’s lib/ folder, or add the driver classes to the
   list of parent-first loaded class via
   classloader.parent-first-patterns-additional
   
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
   .

   It says either or


On Wed, Apr 27, 2022 at 3:44 AM Chesnay Schepler  wrote:

> You're misinterpreting the docs.
>
> The parent/child-first classloading controls where Flink looks for a class
> *first*, specifically whether we first load from /lib or the user-jar.
> It does not allow you to load something from the user-jar in the parent
> classloader. That's just not how it works.
>
> It must be in /lib.
>
> On 27/04/2022 04:59, John Smith wrote:
>
> Hi Chesnay as per the docs...
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>
> You can either put the jars in task manager lib folder or use
> classloader.parent-first-patterns-additional
> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>
>
> I prefer the latter like this: the dependency stays with the user-jar and
> not on the task manager.
>
> On Tue, Apr 26, 2022 at 9:52 PM John Smith  wrote:
>
>> Ok so I should put the Apache ignite and my Microsoft drivers in the lib
>> folders of my task managers?
>>
>> And then in my job jar only include them as compile time dependencies?
>>
>>
>> On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler 
>> wrote:
>>
>>> JDBC drivers are well-known for leaking classloaders unfortunately.
>>>
>>> You have correctly identified your alternatives.
>>>
>>> You must put the jdbc driver into /lib instead. Setting only the
>>> parent-first pattern shouldn't affect anything.
>>> That is only relevant if something is in both in /lib and the user-jar,
>>> telling Flink to prioritize what is in lib.
>>>
>>>
>>>
>>> On 26/04/2022 15:35, John Smith wrote:
>>>
>>> So I put classloader.parent-first-patterns.additional:
>>> "org.apache.ignite." in the task config and so far I don't think I'm
>>> getting "java.lang.OutOfMemoryError: Metaspace" any more.
>>>
>>> Or it's too early to tell.
>>>
>>> Though now, the task managers are shutting down due to some
>>> other failures.
>>>
>>> So maybe because tasks were failing and reloading often the task manager
>>> was running out of Metspace. But now maybe it's just cleanly shutting down.
>>>
>>> On Wed, Apr 20, 2022 at 11:35 AM John Smith 
>>> wrote:
>>>
>>>> Or I can put in the config to treat org.apache.ignite. classes as first
>>>> class?
>>>>
>>>> On Tue, Apr 19, 2022 at 10:18 PM John Smith 
>>>> wrote:
>>>>
>>>>> Ok, so I loaded the dump into Eclipse Mat and followed:
>>>>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>>>>
>>>>> - On the Histogram, I got over 30 entries for: ChildFirstClassLoader
>>>>> - Then I clicked on one of them "Merge Shortest Path..." and picked
>>>>> "Exclude all phantom/weak/soft references"
>>>>> - Which then gave me: SqlDriverManager > Apache Ignite JdbcThin Driver
>>>>>
>>>>> So i'm guessing anything JDBC based. I should copy into the task
>>>>> manager libs folder and my jobs make the dependencies as compile only?
>>>>>
>>>>> On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko <
>>>>> yaros...@goldsky.io> wrote:
>>>>>
>>>>>> Also
>>>>>> https://shopify.engineering/optimizing-apache-flink-applications-tips
>>>>>> might be helpful (has a section on profiling, as well as classloading).
>>>>>>
>>>>>> On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler 
>>>>>> wrote:
>>>>>>
>>>>>>> We have a very rough "guide" in the wiki (it's just the specific
>>>>>>> steps I took to debug another leak):
>>>>>>>
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>>>>>>
>>>>>&g

Re: How to debug Metaspace exception?

2022-04-26 Thread John Smith
Hi Chesnay as per the docs...
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

You can either put the jars in task manager lib folder or use
classloader.parent-first-patterns-additional
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-additional>

I prefer the latter like this: the dependency stays with the user-jar and
not on the task manager.

On Tue, Apr 26, 2022 at 9:52 PM John Smith  wrote:

> Ok so I should put the Apache ignite and my Microsoft drivers in the lib
> folders of my task managers?
>
> And then in my job jar only include them as compile time dependencies?
>
>
> On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler 
> wrote:
>
>> JDBC drivers are well-known for leaking classloaders unfortunately.
>>
>> You have correctly identified your alternatives.
>>
>> You must put the jdbc driver into /lib instead. Setting only the
>> parent-first pattern shouldn't affect anything.
>> That is only relevant if something is in both in /lib and the user-jar,
>> telling Flink to prioritize what is in lib.
>>
>>
>>
>> On 26/04/2022 15:35, John Smith wrote:
>>
>> So I put classloader.parent-first-patterns.additional:
>> "org.apache.ignite." in the task config and so far I don't think I'm
>> getting "java.lang.OutOfMemoryError: Metaspace" any more.
>>
>> Or it's too early to tell.
>>
>> Though now, the task managers are shutting down due to some
>> other failures.
>>
>> So maybe because tasks were failing and reloading often the task manager
>> was running out of Metspace. But now maybe it's just cleanly shutting down.
>>
>> On Wed, Apr 20, 2022 at 11:35 AM John Smith 
>> wrote:
>>
>>> Or I can put in the config to treat org.apache.ignite. classes as first
>>> class?
>>>
>>> On Tue, Apr 19, 2022 at 10:18 PM John Smith 
>>> wrote:
>>>
>>>> Ok, so I loaded the dump into Eclipse Mat and followed:
>>>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>>>
>>>> - On the Histogram, I got over 30 entries for: ChildFirstClassLoader
>>>> - Then I clicked on one of them "Merge Shortest Path..." and picked
>>>> "Exclude all phantom/weak/soft references"
>>>> - Which then gave me: SqlDriverManager > Apache Ignite JdbcThin Driver
>>>>
>>>> So i'm guessing anything JDBC based. I should copy into the task
>>>> manager libs folder and my jobs make the dependencies as compile only?
>>>>
>>>> On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko <
>>>> yaros...@goldsky.io> wrote:
>>>>
>>>>> Also
>>>>> https://shopify.engineering/optimizing-apache-flink-applications-tips
>>>>> might be helpful (has a section on profiling, as well as classloading).
>>>>>
>>>>> On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler 
>>>>> wrote:
>>>>>
>>>>>> We have a very rough "guide" in the wiki (it's just the specific
>>>>>> steps I took to debug another leak):
>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>>>>>
>>>>>> On 19/04/2022 12:01, huweihua wrote:
>>>>>>
>>>>>> Hi, John
>>>>>>
>>>>>> Sorry for the late reply. You can use MAT[1] to analyze the dump
>>>>>> file. Check whether have too many loaded classes.
>>>>>>
>>>>>> [1] https://www.eclipse.org/mat/
>>>>>>
>>>>>> 2022年4月18日 下午9:55,John Smith  写道:
>>>>>>
>>>>>> Hi, can anyone help with this? I never looked at a dump file before.
>>>>>>
>>>>>> On Thu, Apr 14, 2022 at 11:59 AM John Smith 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, so I have a dump file. What do I look for?
>>>>>>>
>>>>>>> On Thu, Mar 31, 2022 at 3:28 PM John Smith 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ok so if there's a leak, if I manually stop the job and restart it
>>>>>>>> from the UI multiple times, I won't see the issue because because the
>>>>>>>> classes are unloaded correctly?
>>>>>>>>
>>>>&g

Re: How to debug Metaspace exception?

2022-04-26 Thread John Smith
Ok so I should put the Apache ignite and my Microsoft drivers in the lib
folders of my task managers?

And then in my job jar only include them as compile time dependencies?


On Tue, Apr 26, 2022 at 10:42 AM Chesnay Schepler 
wrote:

> JDBC drivers are well-known for leaking classloaders unfortunately.
>
> You have correctly identified your alternatives.
>
> You must put the jdbc driver into /lib instead. Setting only the
> parent-first pattern shouldn't affect anything.
> That is only relevant if something is in both in /lib and the user-jar,
> telling Flink to prioritize what is in lib.
>
>
>
> On 26/04/2022 15:35, John Smith wrote:
>
> So I put classloader.parent-first-patterns.additional:
> "org.apache.ignite." in the task config and so far I don't think I'm
> getting "java.lang.OutOfMemoryError: Metaspace" any more.
>
> Or it's too early to tell.
>
> Though now, the task managers are shutting down due to some other failures.
>
> So maybe because tasks were failing and reloading often the task manager
> was running out of Metspace. But now maybe it's just cleanly shutting down.
>
> On Wed, Apr 20, 2022 at 11:35 AM John Smith 
> wrote:
>
>> Or I can put in the config to treat org.apache.ignite. classes as first
>> class?
>>
>> On Tue, Apr 19, 2022 at 10:18 PM John Smith 
>> wrote:
>>
>>> Ok, so I loaded the dump into Eclipse Mat and followed:
>>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>>
>>> - On the Histogram, I got over 30 entries for: ChildFirstClassLoader
>>> - Then I clicked on one of them "Merge Shortest Path..." and picked
>>> "Exclude all phantom/weak/soft references"
>>> - Which then gave me: SqlDriverManager > Apache Ignite JdbcThin Driver
>>>
>>> So i'm guessing anything JDBC based. I should copy into the task manager
>>> libs folder and my jobs make the dependencies as compile only?
>>>
>>> On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko 
>>> wrote:
>>>
>>>> Also
>>>> https://shopify.engineering/optimizing-apache-flink-applications-tips
>>>> might be helpful (has a section on profiling, as well as classloading).
>>>>
>>>> On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler 
>>>> wrote:
>>>>
>>>>> We have a very rough "guide" in the wiki (it's just the specific steps
>>>>> I took to debug another leak):
>>>>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>>>>
>>>>> On 19/04/2022 12:01, huweihua wrote:
>>>>>
>>>>> Hi, John
>>>>>
>>>>> Sorry for the late reply. You can use MAT[1] to analyze the dump file.
>>>>> Check whether have too many loaded classes.
>>>>>
>>>>> [1] https://www.eclipse.org/mat/
>>>>>
>>>>> 2022年4月18日 下午9:55,John Smith  写道:
>>>>>
>>>>> Hi, can anyone help with this? I never looked at a dump file before.
>>>>>
>>>>> On Thu, Apr 14, 2022 at 11:59 AM John Smith 
>>>>> wrote:
>>>>>
>>>>>> Hi, so I have a dump file. What do I look for?
>>>>>>
>>>>>> On Thu, Mar 31, 2022 at 3:28 PM John Smith 
>>>>>> wrote:
>>>>>>
>>>>>>> Ok so if there's a leak, if I manually stop the job and restart it
>>>>>>> from the UI multiple times, I won't see the issue because because the
>>>>>>> classes are unloaded correctly?
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 31, 2022 at 9:20 AM huweihua 
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> The difference is that manually canceling the job stops the
>>>>>>>> JobMaster, but automatic failover keeps the JobMaster running. But 
>>>>>>>> looking
>>>>>>>> on TaskManager, it doesn't make much difference
>>>>>>>>
>>>>>>>>
>>>>>>>> 2022年3月31日 上午4:01,John Smith  写道:
>>>>>>>>
>>>>>>>> Also if I manually cancel and restart the same job over and over is
>>>>>>>> it the same as if flink was restarting a job due to failure?
>>>>>>>>
>>>>>>>> I.e: Wh

Re: Task manager shutting down.

2022-04-26 Thread John Smith
Yeah based off the flink JDBC output format...


On Tue, Apr 26, 2022 at 10:05 AM Martijn Visser 
wrote:

> Hi John,
>
> Have you built your own JDBC MSSQL source or sink or perhaps a CDC driver?
> Because I'm not aware of a Flink Microsoft SQL Server JDBC driver.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
> https://github.com/MartijnVisser
>
>
> On Tue, 26 Apr 2022 at 16:01, John Smith  wrote:
>
>> Hi running 1.14.4
>>
>> Logs included:
>> https://www.dropbox.com/s/8zjndt5rzd9o80f/flink-flink-taskexecutor-138-task-0002.log?dl=0
>>
>> 1- My task managers shut down with: Terminating TaskManagerRunner with
>> exit code 1.
>> 2- It seems to happen at the same time every day. Which leads me to
>> believe it's our database indexing (See below for reasoning of this).
>> 3- Most of our jobs are ETL from Kafka to SQL Server.
>> 4- We see the following exceptions in the logs:
>>   - Task 'Sink: jdbc (1/1)#10' did not react to cancelling signal -
>> interrupting; it is stuck for 30 seconds in method:
>> ... com.microsoft.sqlserver.jdbc.TDSChannel ...
>>   - Sink: jdbc (1/1)#9 (3aaf6d8a45df6c43198bc8297b42354c) switched
>> from RUNNING to FAILED with failure cause:
>> org.apache.flink.util.FlinkException: Disconnect from JobManager
>> responsible for ...
>> 5- Also seeing this: Failed to close consumer network client with type
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
>> java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/network/Selector$CloseMode
>>
>> So what I'm guessing is happening is the indexing is blocking the job and
>> the task manager cannot cleanly remove the job and finally after a while it
>> decides to shut down completely?
>>
>> Is there a way to pause the stream and restart at a later time knowing
>> that this happens always at the same wall clock time? Or maybe allow the
>> JDBC to cleanly shutdown with a timeout?
>>
>>
>>


Task manager shutting down.

2022-04-26 Thread John Smith
Hi running 1.14.4

Logs included:
https://www.dropbox.com/s/8zjndt5rzd9o80f/flink-flink-taskexecutor-138-task-0002.log?dl=0

1- My task managers shut down with: Terminating TaskManagerRunner with exit
code 1.
2- It seems to happen at the same time every day. Which leads me to believe
it's our database indexing (See below for reasoning of this).
3- Most of our jobs are ETL from Kafka to SQL Server.
4- We see the following exceptions in the logs:
  - Task 'Sink: jdbc (1/1)#10' did not react to cancelling signal -
interrupting; it is stuck for 30 seconds in method:
... com.microsoft.sqlserver.jdbc.TDSChannel ...
  - Sink: jdbc (1/1)#9 (3aaf6d8a45df6c43198bc8297b42354c) switched from
RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkException:
Disconnect from JobManager responsible for ...
5- Also seeing this: Failed to close consumer network client with type
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
java.lang.NoClassDefFoundError:
org/apache/kafka/common/network/Selector$CloseMode

So what I'm guessing is happening is the indexing is blocking the job and
the task manager cannot cleanly remove the job and finally after a while it
decides to shut down completely?

Is there a way to pause the stream and restart at a later time knowing that
this happens always at the same wall clock time? Or maybe allow the JDBC to
cleanly shutdown with a timeout?


Re: How to debug Metaspace exception?

2022-04-26 Thread John Smith
So I put classloader.parent-first-patterns.additional: "org.apache.ignite."
in the task config and so far I don't think I'm getting
"java.lang.OutOfMemoryError:
Metaspace" any more.

Or it's too early to tell.

Though now, the task managers are shutting down due to some other failures.

So maybe because tasks were failing and reloading often the task manager
was running out of Metspace. But now maybe it's just cleanly shutting down.

On Wed, Apr 20, 2022 at 11:35 AM John Smith  wrote:

> Or I can put in the config to treat org.apache.ignite. classes as first
> class?
>
> On Tue, Apr 19, 2022 at 10:18 PM John Smith 
> wrote:
>
>> Ok, so I loaded the dump into Eclipse Mat and followed:
>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>
>> - On the Histogram, I got over 30 entries for: ChildFirstClassLoader
>> - Then I clicked on one of them "Merge Shortest Path..." and picked
>> "Exclude all phantom/weak/soft references"
>> - Which then gave me: SqlDriverManager > Apache Ignite JdbcThin Driver
>>
>> So i'm guessing anything JDBC based. I should copy into the task manager
>> libs folder and my jobs make the dependencies as compile only?
>>
>> On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko 
>> wrote:
>>
>>> Also
>>> https://shopify.engineering/optimizing-apache-flink-applications-tips
>>> might be helpful (has a section on profiling, as well as classloading).
>>>
>>> On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler 
>>> wrote:
>>>
>>>> We have a very rough "guide" in the wiki (it's just the specific steps
>>>> I took to debug another leak):
>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>>>
>>>> On 19/04/2022 12:01, huweihua wrote:
>>>>
>>>> Hi, John
>>>>
>>>> Sorry for the late reply. You can use MAT[1] to analyze the dump file.
>>>> Check whether have too many loaded classes.
>>>>
>>>> [1] https://www.eclipse.org/mat/
>>>>
>>>> 2022年4月18日 下午9:55,John Smith  写道:
>>>>
>>>> Hi, can anyone help with this? I never looked at a dump file before.
>>>>
>>>> On Thu, Apr 14, 2022 at 11:59 AM John Smith 
>>>> wrote:
>>>>
>>>>> Hi, so I have a dump file. What do I look for?
>>>>>
>>>>> On Thu, Mar 31, 2022 at 3:28 PM John Smith 
>>>>> wrote:
>>>>>
>>>>>> Ok so if there's a leak, if I manually stop the job and restart it
>>>>>> from the UI multiple times, I won't see the issue because because the
>>>>>> classes are unloaded correctly?
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 31, 2022 at 9:20 AM huweihua 
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> The difference is that manually canceling the job stops the
>>>>>>> JobMaster, but automatic failover keeps the JobMaster running. But 
>>>>>>> looking
>>>>>>> on TaskManager, it doesn't make much difference
>>>>>>>
>>>>>>>
>>>>>>> 2022年3月31日 上午4:01,John Smith  写道:
>>>>>>>
>>>>>>> Also if I manually cancel and restart the same job over and over is
>>>>>>> it the same as if flink was restarting a job due to failure?
>>>>>>>
>>>>>>> I.e: When I click "Cancel Job" on the UI is the job completely
>>>>>>> unloaded vs when the job scheduler restarts a job because if whatever
>>>>>>> reason?
>>>>>>>
>>>>>>> Lile this I'll stop and restart the job a few times or maybe I can
>>>>>>> trick my job to fail and have the scheduler restart it. Ok let me think
>>>>>>> about this...
>>>>>>>
>>>>>>> On Wed, Mar 30, 2022 at 10:24 AM 胡伟华  wrote:
>>>>>>>
>>>>>>>> So if I run the same jobs in my dev env will I still be able to see
>>>>>>>> the similar dump?
>>>>>>>>
>>>>>>>> I think running the same job in dev should be reproducible, maybe
>>>>>>>> you can have a try.
>>>>>>>>
>>>>>>>>  If not I would hav

Re: How to debug Metaspace exception?

2022-04-20 Thread John Smith
Or I can put in the config to treat org.apache.ignite. classes as first
class?

On Tue, Apr 19, 2022 at 10:18 PM John Smith  wrote:

> Ok, so I loaded the dump into Eclipse Mat and followed:
> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>
> - On the Histogram, I got over 30 entries for: ChildFirstClassLoader
> - Then I clicked on one of them "Merge Shortest Path..." and picked
> "Exclude all phantom/weak/soft references"
> - Which then gave me: SqlDriverManager > Apache Ignite JdbcThin Driver
>
> So i'm guessing anything JDBC based. I should copy into the task manager
> libs folder and my jobs make the dependencies as compile only?
>
> On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko 
> wrote:
>
>> Also
>> https://shopify.engineering/optimizing-apache-flink-applications-tips
>> might be helpful (has a section on profiling, as well as classloading).
>>
>> On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler 
>> wrote:
>>
>>> We have a very rough "guide" in the wiki (it's just the specific steps I
>>> took to debug another leak):
>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>>
>>> On 19/04/2022 12:01, huweihua wrote:
>>>
>>> Hi, John
>>>
>>> Sorry for the late reply. You can use MAT[1] to analyze the dump file.
>>> Check whether have too many loaded classes.
>>>
>>> [1] https://www.eclipse.org/mat/
>>>
>>> 2022年4月18日 下午9:55,John Smith  写道:
>>>
>>> Hi, can anyone help with this? I never looked at a dump file before.
>>>
>>> On Thu, Apr 14, 2022 at 11:59 AM John Smith 
>>> wrote:
>>>
>>>> Hi, so I have a dump file. What do I look for?
>>>>
>>>> On Thu, Mar 31, 2022 at 3:28 PM John Smith 
>>>> wrote:
>>>>
>>>>> Ok so if there's a leak, if I manually stop the job and restart it
>>>>> from the UI multiple times, I won't see the issue because because the
>>>>> classes are unloaded correctly?
>>>>>
>>>>>
>>>>> On Thu, Mar 31, 2022 at 9:20 AM huweihua 
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> The difference is that manually canceling the job stops the
>>>>>> JobMaster, but automatic failover keeps the JobMaster running. But 
>>>>>> looking
>>>>>> on TaskManager, it doesn't make much difference
>>>>>>
>>>>>>
>>>>>> 2022年3月31日 上午4:01,John Smith  写道:
>>>>>>
>>>>>> Also if I manually cancel and restart the same job over and over is
>>>>>> it the same as if flink was restarting a job due to failure?
>>>>>>
>>>>>> I.e: When I click "Cancel Job" on the UI is the job completely
>>>>>> unloaded vs when the job scheduler restarts a job because if whatever
>>>>>> reason?
>>>>>>
>>>>>> Lile this I'll stop and restart the job a few times or maybe I can
>>>>>> trick my job to fail and have the scheduler restart it. Ok let me think
>>>>>> about this...
>>>>>>
>>>>>> On Wed, Mar 30, 2022 at 10:24 AM 胡伟华  wrote:
>>>>>>
>>>>>>> So if I run the same jobs in my dev env will I still be able to see
>>>>>>> the similar dump?
>>>>>>>
>>>>>>> I think running the same job in dev should be reproducible, maybe
>>>>>>> you can have a try.
>>>>>>>
>>>>>>>  If not I would have to wait at a low volume time to do it on
>>>>>>> production. Aldo if I recall the dump is as big as the JVM memory right 
>>>>>>> so
>>>>>>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>>>>>>
>>>>>>> Yes, JMAP will pause the JVM, the time of pause depends on the size
>>>>>>> to dump. you can use "jmap -dump:live" to dump only the reachable 
>>>>>>> objects,
>>>>>>> this will take a brief pause
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2022年3月30日 下午9:47,John Smith  写道:
>>>>>>>
>>>>>>> I have 3 task managers (see config below). There is total of 10 jobs
>>&

Re: How to debug Metaspace exception?

2022-04-19 Thread John Smith
Ok, so I loaded the dump into Eclipse Mat and followed:
https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

- On the Histogram, I got over 30 entries for: ChildFirstClassLoader
- Then I clicked on one of them "Merge Shortest Path..." and picked
"Exclude all phantom/weak/soft references"
- Which then gave me: SqlDriverManager > Apache Ignite JdbcThin Driver

So i'm guessing anything JDBC based. I should copy into the task manager
libs folder and my jobs make the dependencies as compile only?

On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko 
wrote:

> Also https://shopify.engineering/optimizing-apache-flink-applications-tips
> might be helpful (has a section on profiling, as well as classloading).
>
> On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler 
> wrote:
>
>> We have a very rough "guide" in the wiki (it's just the specific steps I
>> took to debug another leak):
>>
>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>
>> On 19/04/2022 12:01, huweihua wrote:
>>
>> Hi, John
>>
>> Sorry for the late reply. You can use MAT[1] to analyze the dump file.
>> Check whether have too many loaded classes.
>>
>> [1] https://www.eclipse.org/mat/
>>
>> 2022年4月18日 下午9:55,John Smith  写道:
>>
>> Hi, can anyone help with this? I never looked at a dump file before.
>>
>> On Thu, Apr 14, 2022 at 11:59 AM John Smith 
>> wrote:
>>
>>> Hi, so I have a dump file. What do I look for?
>>>
>>> On Thu, Mar 31, 2022 at 3:28 PM John Smith 
>>> wrote:
>>>
>>>> Ok so if there's a leak, if I manually stop the job and restart it from
>>>> the UI multiple times, I won't see the issue because because the classes
>>>> are unloaded correctly?
>>>>
>>>>
>>>> On Thu, Mar 31, 2022 at 9:20 AM huweihua 
>>>> wrote:
>>>>
>>>>>
>>>>> The difference is that manually canceling the job stops the JobMaster,
>>>>> but automatic failover keeps the JobMaster running. But looking on
>>>>> TaskManager, it doesn't make much difference
>>>>>
>>>>>
>>>>> 2022年3月31日 上午4:01,John Smith  写道:
>>>>>
>>>>> Also if I manually cancel and restart the same job over and over is it
>>>>> the same as if flink was restarting a job due to failure?
>>>>>
>>>>> I.e: When I click "Cancel Job" on the UI is the job completely
>>>>> unloaded vs when the job scheduler restarts a job because if whatever
>>>>> reason?
>>>>>
>>>>> Lile this I'll stop and restart the job a few times or maybe I can
>>>>> trick my job to fail and have the scheduler restart it. Ok let me think
>>>>> about this...
>>>>>
>>>>> On Wed, Mar 30, 2022 at 10:24 AM 胡伟华  wrote:
>>>>>
>>>>>> So if I run the same jobs in my dev env will I still be able to see
>>>>>> the similar dump?
>>>>>>
>>>>>> I think running the same job in dev should be reproducible, maybe you
>>>>>> can have a try.
>>>>>>
>>>>>>  If not I would have to wait at a low volume time to do it on
>>>>>> production. Aldo if I recall the dump is as big as the JVM memory right 
>>>>>> so
>>>>>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>>>>>
>>>>>> Yes, JMAP will pause the JVM, the time of pause depends on the size
>>>>>> to dump. you can use "jmap -dump:live" to dump only the reachable 
>>>>>> objects,
>>>>>> this will take a brief pause
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2022年3月30日 下午9:47,John Smith  写道:
>>>>>>
>>>>>> I have 3 task managers (see config below). There is total of 10 jobs
>>>>>> with 25 slots being used.
>>>>>> The jobs are 100% ETL I.e; They load Json, transform it and push it
>>>>>> to JDBC, only 1 job of the 10 is pushing to Apache Ignite cluster.
>>>>>>
>>>>>> FOR JMAP. I know that it will pause the task manager. So if I run the
>>>>>> same jobs in my dev env will I still be able to see the similar dump? I I
>>>>>> assume so. If not I would have to wait at a low volume time to do it on
>>>>>> produc

Re: How to debug Metaspace exception?

2022-04-18 Thread John Smith
Hi, can anyone help with this? I never looked at a dump file before.

On Thu, Apr 14, 2022 at 11:59 AM John Smith  wrote:

> Hi, so I have a dump file. What do I look for?
>
> On Thu, Mar 31, 2022 at 3:28 PM John Smith  wrote:
>
>> Ok so if there's a leak, if I manually stop the job and restart it from
>> the UI multiple times, I won't see the issue because because the classes
>> are unloaded correctly?
>>
>>
>> On Thu, Mar 31, 2022 at 9:20 AM huweihua  wrote:
>>
>>>
>>> The difference is that manually canceling the job stops the JobMaster,
>>> but automatic failover keeps the JobMaster running. But looking on
>>> TaskManager, it doesn't make much difference
>>>
>>>
>>> 2022年3月31日 上午4:01,John Smith  写道:
>>>
>>> Also if I manually cancel and restart the same job over and over is it
>>> the same as if flink was restarting a job due to failure?
>>>
>>> I.e: When I click "Cancel Job" on the UI is the job completely unloaded
>>> vs when the job scheduler restarts a job because if whatever reason?
>>>
>>> Lile this I'll stop and restart the job a few times or maybe I can trick
>>> my job to fail and have the scheduler restart it. Ok let me think about
>>> this...
>>>
>>> On Wed, Mar 30, 2022 at 10:24 AM 胡伟华  wrote:
>>>
>>>> So if I run the same jobs in my dev env will I still be able to see the
>>>> similar dump?
>>>>
>>>> I think running the same job in dev should be reproducible, maybe you
>>>> can have a try.
>>>>
>>>>  If not I would have to wait at a low volume time to do it on
>>>> production. Aldo if I recall the dump is as big as the JVM memory right so
>>>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>>>
>>>> Yes, JMAP will pause the JVM, the time of pause depends on the size to
>>>> dump. you can use "jmap -dump:live" to dump only the reachable objects,
>>>> this will take a brief pause
>>>>
>>>>
>>>>
>>>> 2022年3月30日 下午9:47,John Smith  写道:
>>>>
>>>> I have 3 task managers (see config below). There is total of 10 jobs
>>>> with 25 slots being used.
>>>> The jobs are 100% ETL I.e; They load Json, transform it and push it to
>>>> JDBC, only 1 job of the 10 is pushing to Apache Ignite cluster.
>>>>
>>>> FOR JMAP. I know that it will pause the task manager. So if I run the
>>>> same jobs in my dev env will I still be able to see the similar dump? I I
>>>> assume so. If not I would have to wait at a low volume time to do it on
>>>> production. Aldo if I recall the dump is as big as the JVM memory right so
>>>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>>>
>>>>
>>>> # Operating system has 16GB total.
>>>> env.ssh.opts: -l flink -oStrictHostKeyChecking=no
>>>>
>>>> cluster.evenly-spread-out-slots: true
>>>>
>>>> taskmanager.memory.flink.size: 10240m
>>>> taskmanager.memory.jvm-metaspace.size: 2048m
>>>> taskmanager.numberOfTaskSlots: 16
>>>> parallelism.default: 1
>>>>
>>>> high-availability: zookeeper
>>>> high-availability.storageDir: file:///mnt/flink/ha/flink_1_14/
>>>> high-availability.zookeeper.quorum: ...
>>>> high-availability.zookeeper.path.root: /flink_1_14
>>>> high-availability.cluster-id: /flink_1_14_cluster_0001
>>>>
>>>> web.upload.dir: /mnt/flink/uploads/flink_1_14
>>>>
>>>> state.backend: rocksdb
>>>> state.backend.incremental: true
>>>> state.checkpoints.dir: file:///mnt/flink/checkpoints/flink_1_14
>>>> state.savepoints.dir: file:///mnt/flink/savepoints/flink_1_14
>>>>
>>>> On Wed, Mar 30, 2022 at 2:16 AM 胡伟华  wrote:
>>>>
>>>>> Hi, John
>>>>>
>>>>> Could you tell us you application scenario? Is it a flink session
>>>>> cluster with a lot of jobs?
>>>>>
>>>>> Maybe you can try to dump the memory with jmap and use tools such as
>>>>> MAT to analyze whether there are abnormal classes and classloaders
>>>>>
>>>>>
>>>>> > 2022年3月30日 上午6:09,John Smith  写道:
>>>>> >
>>>>> > Hi running 1.14.4
>>>>> >
>>>>> > My tasks manager still fails with java.lang.OutOfMemoryError:
>>>>> Metaspace. The metaspace out-of-memory error has occurred. This can mean
>>>>> two things: either the job requires a larger size of JVM metaspace to load
>>>>> classes or there is a class loading leak.
>>>>> >
>>>>> > I have 2GB of metaspace configed
>>>>> taskmanager.memory.jvm-metaspace.size: 2048m
>>>>> >
>>>>> > But the task nodes still fail.
>>>>> >
>>>>> > When looking at the UI metrics, the metaspace starts low. Now I see
>>>>> 85% usage. It seems to be a class loading leak at this point, how can we
>>>>> debug this issue?
>>>>>
>>>>>
>>>>
>>>


Re: How to debug Metaspace exception?

2022-04-14 Thread John Smith
Hi, so I have a dump file. What do I look for?

On Thu, Mar 31, 2022 at 3:28 PM John Smith  wrote:

> Ok so if there's a leak, if I manually stop the job and restart it from
> the UI multiple times, I won't see the issue because because the classes
> are unloaded correctly?
>
>
> On Thu, Mar 31, 2022 at 9:20 AM huweihua  wrote:
>
>>
>> The difference is that manually canceling the job stops the JobMaster,
>> but automatic failover keeps the JobMaster running. But looking on
>> TaskManager, it doesn't make much difference
>>
>>
>> 2022年3月31日 上午4:01,John Smith  写道:
>>
>> Also if I manually cancel and restart the same job over and over is it
>> the same as if flink was restarting a job due to failure?
>>
>> I.e: When I click "Cancel Job" on the UI is the job completely unloaded
>> vs when the job scheduler restarts a job because if whatever reason?
>>
>> Lile this I'll stop and restart the job a few times or maybe I can trick
>> my job to fail and have the scheduler restart it. Ok let me think about
>> this...
>>
>> On Wed, Mar 30, 2022 at 10:24 AM 胡伟华  wrote:
>>
>>> So if I run the same jobs in my dev env will I still be able to see the
>>> similar dump?
>>>
>>> I think running the same job in dev should be reproducible, maybe you
>>> can have a try.
>>>
>>>  If not I would have to wait at a low volume time to do it on
>>> production. Aldo if I recall the dump is as big as the JVM memory right so
>>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>>
>>> Yes, JMAP will pause the JVM, the time of pause depends on the size to
>>> dump. you can use "jmap -dump:live" to dump only the reachable objects,
>>> this will take a brief pause
>>>
>>>
>>>
>>> 2022年3月30日 下午9:47,John Smith  写道:
>>>
>>> I have 3 task managers (see config below). There is total of 10 jobs
>>> with 25 slots being used.
>>> The jobs are 100% ETL I.e; They load Json, transform it and push it to
>>> JDBC, only 1 job of the 10 is pushing to Apache Ignite cluster.
>>>
>>> FOR JMAP. I know that it will pause the task manager. So if I run the
>>> same jobs in my dev env will I still be able to see the similar dump? I I
>>> assume so. If not I would have to wait at a low volume time to do it on
>>> production. Aldo if I recall the dump is as big as the JVM memory right so
>>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>>
>>>
>>> # Operating system has 16GB total.
>>> env.ssh.opts: -l flink -oStrictHostKeyChecking=no
>>>
>>> cluster.evenly-spread-out-slots: true
>>>
>>> taskmanager.memory.flink.size: 10240m
>>> taskmanager.memory.jvm-metaspace.size: 2048m
>>> taskmanager.numberOfTaskSlots: 16
>>> parallelism.default: 1
>>>
>>> high-availability: zookeeper
>>> high-availability.storageDir: file:///mnt/flink/ha/flink_1_14/
>>> high-availability.zookeeper.quorum: ...
>>> high-availability.zookeeper.path.root: /flink_1_14
>>> high-availability.cluster-id: /flink_1_14_cluster_0001
>>>
>>> web.upload.dir: /mnt/flink/uploads/flink_1_14
>>>
>>> state.backend: rocksdb
>>> state.backend.incremental: true
>>> state.checkpoints.dir: file:///mnt/flink/checkpoints/flink_1_14
>>> state.savepoints.dir: file:///mnt/flink/savepoints/flink_1_14
>>>
>>> On Wed, Mar 30, 2022 at 2:16 AM 胡伟华  wrote:
>>>
>>>> Hi, John
>>>>
>>>> Could you tell us you application scenario? Is it a flink session
>>>> cluster with a lot of jobs?
>>>>
>>>> Maybe you can try to dump the memory with jmap and use tools such as
>>>> MAT to analyze whether there are abnormal classes and classloaders
>>>>
>>>>
>>>> > 2022年3月30日 上午6:09,John Smith  写道:
>>>> >
>>>> > Hi running 1.14.4
>>>> >
>>>> > My tasks manager still fails with java.lang.OutOfMemoryError:
>>>> Metaspace. The metaspace out-of-memory error has occurred. This can mean
>>>> two things: either the job requires a larger size of JVM metaspace to load
>>>> classes or there is a class loading leak.
>>>> >
>>>> > I have 2GB of metaspace configed
>>>> taskmanager.memory.jvm-metaspace.size: 2048m
>>>> >
>>>> > But the task nodes still fail.
>>>> >
>>>> > When looking at the UI metrics, the metaspace starts low. Now I see
>>>> 85% usage. It seems to be a class loading leak at this point, how can we
>>>> debug this issue?
>>>>
>>>>
>>>
>>


Re: How to debug Metaspace exception?

2022-03-31 Thread John Smith
Ok so if there's a leak, if I manually stop the job and restart it from the
UI multiple times, I won't see the issue because because the classes are
unloaded correctly?


On Thu, Mar 31, 2022 at 9:20 AM huweihua  wrote:

>
> The difference is that manually canceling the job stops the JobMaster, but
> automatic failover keeps the JobMaster running. But looking on TaskManager,
> it doesn't make much difference
>
>
> 2022年3月31日 上午4:01,John Smith  写道:
>
> Also if I manually cancel and restart the same job over and over is it the
> same as if flink was restarting a job due to failure?
>
> I.e: When I click "Cancel Job" on the UI is the job completely unloaded vs
> when the job scheduler restarts a job because if whatever reason?
>
> Lile this I'll stop and restart the job a few times or maybe I can trick
> my job to fail and have the scheduler restart it. Ok let me think about
> this...
>
> On Wed, Mar 30, 2022 at 10:24 AM 胡伟华  wrote:
>
>> So if I run the same jobs in my dev env will I still be able to see the
>> similar dump?
>>
>> I think running the same job in dev should be reproducible, maybe you can
>> have a try.
>>
>>  If not I would have to wait at a low volume time to do it on production.
>> Aldo if I recall the dump is as big as the JVM memory right so if I have
>> 10GB configed for the JVM the dump will be 10GB file?
>>
>> Yes, JMAP will pause the JVM, the time of pause depends on the size to
>> dump. you can use "jmap -dump:live" to dump only the reachable objects,
>> this will take a brief pause
>>
>>
>>
>> 2022年3月30日 下午9:47,John Smith  写道:
>>
>> I have 3 task managers (see config below). There is total of 10 jobs with
>> 25 slots being used.
>> The jobs are 100% ETL I.e; They load Json, transform it and push it to
>> JDBC, only 1 job of the 10 is pushing to Apache Ignite cluster.
>>
>> FOR JMAP. I know that it will pause the task manager. So if I run the
>> same jobs in my dev env will I still be able to see the similar dump? I I
>> assume so. If not I would have to wait at a low volume time to do it on
>> production. Aldo if I recall the dump is as big as the JVM memory right so
>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>
>>
>> # Operating system has 16GB total.
>> env.ssh.opts: -l flink -oStrictHostKeyChecking=no
>>
>> cluster.evenly-spread-out-slots: true
>>
>> taskmanager.memory.flink.size: 10240m
>> taskmanager.memory.jvm-metaspace.size: 2048m
>> taskmanager.numberOfTaskSlots: 16
>> parallelism.default: 1
>>
>> high-availability: zookeeper
>> high-availability.storageDir: file:///mnt/flink/ha/flink_1_14/
>> high-availability.zookeeper.quorum: ...
>> high-availability.zookeeper.path.root: /flink_1_14
>> high-availability.cluster-id: /flink_1_14_cluster_0001
>>
>> web.upload.dir: /mnt/flink/uploads/flink_1_14
>>
>> state.backend: rocksdb
>> state.backend.incremental: true
>> state.checkpoints.dir: file:///mnt/flink/checkpoints/flink_1_14
>> state.savepoints.dir: file:///mnt/flink/savepoints/flink_1_14
>>
>> On Wed, Mar 30, 2022 at 2:16 AM 胡伟华  wrote:
>>
>>> Hi, John
>>>
>>> Could you tell us you application scenario? Is it a flink session
>>> cluster with a lot of jobs?
>>>
>>> Maybe you can try to dump the memory with jmap and use tools such as MAT
>>> to analyze whether there are abnormal classes and classloaders
>>>
>>>
>>> > 2022年3月30日 上午6:09,John Smith  写道:
>>> >
>>> > Hi running 1.14.4
>>> >
>>> > My tasks manager still fails with java.lang.OutOfMemoryError:
>>> Metaspace. The metaspace out-of-memory error has occurred. This can mean
>>> two things: either the job requires a larger size of JVM metaspace to load
>>> classes or there is a class loading leak.
>>> >
>>> > I have 2GB of metaspace configed
>>> taskmanager.memory.jvm-metaspace.size: 2048m
>>> >
>>> > But the task nodes still fail.
>>> >
>>> > When looking at the UI metrics, the metaspace starts low. Now I see
>>> 85% usage. It seems to be a class loading leak at this point, how can we
>>> debug this issue?
>>>
>>>
>>
>


Re: How to debug Metaspace exception?

2022-03-30 Thread John Smith
Also if I manually cancel and restart the same job over and over is it the
same as if flink was restarting a job due to failure?

I.e: When I click "Cancel Job" on the UI is the job completely unloaded vs
when the job scheduler restarts a job because if whatever reason?

Lile this I'll stop and restart the job a few times or maybe I can trick my
job to fail and have the scheduler restart it. Ok let me think about this...

On Wed, Mar 30, 2022 at 10:24 AM 胡伟华  wrote:

> So if I run the same jobs in my dev env will I still be able to see the
> similar dump?
>
> I think running the same job in dev should be reproducible, maybe you can
> have a try.
>
>  If not I would have to wait at a low volume time to do it on production.
> Aldo if I recall the dump is as big as the JVM memory right so if I have
> 10GB configed for the JVM the dump will be 10GB file?
>
> Yes, JMAP will pause the JVM, the time of pause depends on the size to
> dump. you can use "jmap -dump:live" to dump only the reachable objects,
> this will take a brief pause
>
>
>
> 2022年3月30日 下午9:47,John Smith  写道:
>
> I have 3 task managers (see config below). There is total of 10 jobs with
> 25 slots being used.
> The jobs are 100% ETL I.e; They load Json, transform it and push it to
> JDBC, only 1 job of the 10 is pushing to Apache Ignite cluster.
>
> FOR JMAP. I know that it will pause the task manager. So if I run the same
> jobs in my dev env will I still be able to see the similar dump? I I assume
> so. If not I would have to wait at a low volume time to do it on
> production. Aldo if I recall the dump is as big as the JVM memory right so
> if I have 10GB configed for the JVM the dump will be 10GB file?
>
>
> # Operating system has 16GB total.
> env.ssh.opts: -l flink -oStrictHostKeyChecking=no
>
> cluster.evenly-spread-out-slots: true
>
> taskmanager.memory.flink.size: 10240m
> taskmanager.memory.jvm-metaspace.size: 2048m
> taskmanager.numberOfTaskSlots: 16
> parallelism.default: 1
>
> high-availability: zookeeper
> high-availability.storageDir: file:///mnt/flink/ha/flink_1_14/
> high-availability.zookeeper.quorum: ...
> high-availability.zookeeper.path.root: /flink_1_14
> high-availability.cluster-id: /flink_1_14_cluster_0001
>
> web.upload.dir: /mnt/flink/uploads/flink_1_14
>
> state.backend: rocksdb
> state.backend.incremental: true
> state.checkpoints.dir: file:///mnt/flink/checkpoints/flink_1_14
> state.savepoints.dir: file:///mnt/flink/savepoints/flink_1_14
>
> On Wed, Mar 30, 2022 at 2:16 AM 胡伟华  wrote:
>
>> Hi, John
>>
>> Could you tell us you application scenario? Is it a flink session cluster
>> with a lot of jobs?
>>
>> Maybe you can try to dump the memory with jmap and use tools such as MAT
>> to analyze whether there are abnormal classes and classloaders
>>
>>
>> > 2022年3月30日 上午6:09,John Smith  写道:
>> >
>> > Hi running 1.14.4
>> >
>> > My tasks manager still fails with java.lang.OutOfMemoryError:
>> Metaspace. The metaspace out-of-memory error has occurred. This can mean
>> two things: either the job requires a larger size of JVM metaspace to load
>> classes or there is a class loading leak.
>> >
>> > I have 2GB of metaspace configed taskmanager.memory.jvm-metaspace.size:
>> 2048m
>> >
>> > But the task nodes still fail.
>> >
>> > When looking at the UI metrics, the metaspace starts low. Now I see 85%
>> usage. It seems to be a class loading leak at this point, how can we debug
>> this issue?
>>
>>
>


Re: How to debug Metaspace exception?

2022-03-30 Thread John Smith
I have 3 task managers (see config below). There is total of 10 jobs with
25 slots being used.
The jobs are 100% ETL I.e; They load Json, transform it and push it to
JDBC, only 1 job of the 10 is pushing to Apache Ignite cluster.

FOR JMAP. I know that it will pause the task manager. So if I run the same
jobs in my dev env will I still be able to see the similar dump? I I assume
so. If not I would have to wait at a low volume time to do it on
production. Aldo if I recall the dump is as big as the JVM memory right so
if I have 10GB configed for the JVM the dump will be 10GB file?


# Operating system has 16GB total.
env.ssh.opts: -l flink -oStrictHostKeyChecking=no

cluster.evenly-spread-out-slots: true

taskmanager.memory.flink.size: 10240m
taskmanager.memory.jvm-metaspace.size: 2048m
taskmanager.numberOfTaskSlots: 16
parallelism.default: 1

high-availability: zookeeper
high-availability.storageDir: file:///mnt/flink/ha/flink_1_14/
high-availability.zookeeper.quorum: ...
high-availability.zookeeper.path.root: /flink_1_14
high-availability.cluster-id: /flink_1_14_cluster_0001

web.upload.dir: /mnt/flink/uploads/flink_1_14

state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: file:///mnt/flink/checkpoints/flink_1_14
state.savepoints.dir: file:///mnt/flink/savepoints/flink_1_14

On Wed, Mar 30, 2022 at 2:16 AM 胡伟华  wrote:

> Hi, John
>
> Could you tell us you application scenario? Is it a flink session cluster
> with a lot of jobs?
>
> Maybe you can try to dump the memory with jmap and use tools such as MAT
> to analyze whether there are abnormal classes and classloaders
>
>
> > 2022年3月30日 上午6:09,John Smith  写道:
> >
> > Hi running 1.14.4
> >
> > My tasks manager still fails with java.lang.OutOfMemoryError: Metaspace.
> The metaspace out-of-memory error has occurred. This can mean two things:
> either the job requires a larger size of JVM metaspace to load classes or
> there is a class loading leak.
> >
> > I have 2GB of metaspace configed taskmanager.memory.jvm-metaspace.size:
> 2048m
> >
> > But the task nodes still fail.
> >
> > When looking at the UI metrics, the metaspace starts low. Now I see 85%
> usage. It seems to be a class loading leak at this point, how can we debug
> this issue?
>
>


How to debug Metaspace exception?

2022-03-29 Thread John Smith
Hi running 1.14.4

My tasks manager still fails with java.lang.OutOfMemoryError: Metaspace.
The metaspace out-of-memory error has occurred. This can mean two things:
either the job requires a larger size of JVM metaspace to load classes or
there is a class loading leak.

I have 2GB of metaspace configed taskmanager.memory.jvm-metaspace.size:
2048m

But the task nodes still fail.

When looking at the UI metrics, the metaspace starts low. Now I see 85%
usage. It seems to be a class loading leak at this point, how can we debug
this issue?


Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Hi, I get that but I want to output that key so I can store it in Elastic
grouped by the minute.

I had explained with data examples above. But just to be sure

Lets pretends the current WALL time is 2022-02-14T11:38:01.123Z and I get
the bellow clicks

event time here (ignored/not read)|cnn.com|/some-article
event time here (ignored/not read)|cnn.com|/some-article
event time here (ignored/not read)|cnn.com|/another-article
event time here (ignored/not read)|cnn.com|/some-article

The output should be...

2022-02-14T11:38:00.000Z (this is the wall time rounded to the minute)|
cnn.com|some-article  count = 3
2022-02-14T11:38:00.000Z( this is the wall time rounded to the minute)|
cnn.com|another-article  count = 1





On Mon, Feb 14, 2022 at 10:08 AM Ali Bahadir Zeybek 
wrote:

> Hello John,
>
> That is what exactly the window operator does for you. Can you please
> check the
> documentation[1] and let us know what part of the window operator alone
> does
> not suffice for the use case?
>
> Sincerely,
>
> Ali
>
> [1]:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows
>
> On Mon, Feb 14, 2022 at 4:03 PM John Smith  wrote:
>
>> Because I want to group them for the last X minutes. In this case last 1
>> minute.
>>
>> On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek 
>> wrote:
>>
>>> Hello John,
>>>
>>> Then may I ask you why you need to use a time attribute as part of your
>>> key?
>>> Why not just key by the fields like `mydomain.com` and `some-article`
>>> in your
>>> example and use only window operator for grouping elements based on time?
>>>
>>> Sincerely,
>>>
>>> Ali
>>>
>>> On Mon, Feb 14, 2022 at 3:55 PM John Smith 
>>> wrote:
>>>
>>>> Hi, thanks. As previously mentioned, processing time. So I
>>>> regardless when the event was generated I want to count all events I have
>>>> right now (as soon as they are seen by the flink job).
>>>>
>>>> On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek 
>>>> wrote:
>>>>
>>>>> Hello John,
>>>>>
>>>>> Currently you are grouping the elements two times based on some time
>>>>> attribute, one while keying - with event time - and one while
>>>>> windowing - with
>>>>> processing time. Therefore, the windowing mechanism produces a new
>>>>> window
>>>>> computation when you see an element with the same key but arrived
>>>>> later from
>>>>> the previous window start and end timestamps. Can you please clarify
>>>>> with
>>>>> which notion of time you would like to handle the stream of data?
>>>>>
>>>>> Sincerely,
>>>>>
>>>>> Ali
>>>>>
>>>>> On Fri, Feb 11, 2022 at 6:43 PM John Smith 
>>>>> wrote:
>>>>>
>>>>>> Ok I used the method suggested by Ali. The error is gone. But now I
>>>>>> see multiple counts emitted for the same key...
>>>>>>
>>>>>> DataStream slStream = env.fromSource(kafkaSource, 
>>>>>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>>>>>> .uid(kafkaTopic).name(kafkaTopic)
>>>>>> .setParallelism(kafkaParallelism)
>>>>>> .flatMap(new MapToMyEvent("my-event", windowSizeMins, 
>>>>>> "message")) <-- Timestamp in GMT created here rounded to the closest 
>>>>>> minute down.
>>>>>> .uid("map-json-logs").name("map-json-logs");
>>>>>>
>>>>>> slStream.keyBy(new MinutesKeySelector())
>>>>>> 
>>>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) 
>>>>>> < Tumbling window of 1 minute.
>>>>>>
>>>>>>
>>>>>>
>>>>>> So below you will see a new count was emitted at 16:51 and 16:55
>>>>>>
>>>>>> {"countId":"2022-02-11T16:50:00Z|mydomain.com
>>>>>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
>>>>>> mydomain.com","uri":"/some-article","count":3542}
>>>>>> -
>>>>>> {"countId":"2022-02-1

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Because I want to group them for the last X minutes. In this case last 1
minute.

On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek 
wrote:

> Hello John,
>
> Then may I ask you why you need to use a time attribute as part of your
> key?
> Why not just key by the fields like `mydomain.com` and `some-article` in
> your
> example and use only window operator for grouping elements based on time?
>
> Sincerely,
>
> Ali
>
> On Mon, Feb 14, 2022 at 3:55 PM John Smith  wrote:
>
>> Hi, thanks. As previously mentioned, processing time. So I
>> regardless when the event was generated I want to count all events I have
>> right now (as soon as they are seen by the flink job).
>>
>> On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek 
>> wrote:
>>
>>> Hello John,
>>>
>>> Currently you are grouping the elements two times based on some time
>>> attribute, one while keying - with event time - and one while windowing
>>> - with
>>> processing time. Therefore, the windowing mechanism produces a new window
>>> computation when you see an element with the same key but arrived later
>>> from
>>> the previous window start and end timestamps. Can you please clarify with
>>> which notion of time you would like to handle the stream of data?
>>>
>>> Sincerely,
>>>
>>> Ali
>>>
>>> On Fri, Feb 11, 2022 at 6:43 PM John Smith 
>>> wrote:
>>>
>>>> Ok I used the method suggested by Ali. The error is gone. But now I see
>>>> multiple counts emitted for the same key...
>>>>
>>>> DataStream slStream = env.fromSource(kafkaSource, 
>>>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>>>> .uid(kafkaTopic).name(kafkaTopic)
>>>> .setParallelism(kafkaParallelism)
>>>> .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) 
>>>> <-- Timestamp in GMT created here rounded to the closest minute down.
>>>> .uid("map-json-logs").name("map-json-logs");
>>>>
>>>> slStream.keyBy(new MinutesKeySelector())
>>>> 
>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) 
>>>> < Tumbling window of 1 minute.
>>>>
>>>>
>>>>
>>>> So below you will see a new count was emitted at 16:51 and 16:55
>>>>
>>>> {"countId":"2022-02-11T16:50:00Z|mydomain.com
>>>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
>>>> mydomain.com","uri":"/some-article","count":3542}
>>>> -
>>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>>>> mydomain.com","uri":"/some-article","count":16503}
>>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>>>> mydomain.com","uri":"/some-article","count":70}
>>>> -
>>>>
>>>> {"countId":"2022-02-11T16:52:00Z|mydomain.com
>>>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
>>>> mydomain.com","uri":"/some-article","count":16037}
>>>> {"countId":"2022-02-11T16:53:00Z|mydomain.com
>>>> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
>>>> mydomain.com","uri":"/some-article","count":18679}
>>>> {"countId":"2022-02-11T16:54:00Z|mydomain.com
>>>> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
>>>> mydomain.com","uri":"/some-article","count":17697}
>>>> -
>>>>
>>>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>>>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>>>> mydomain.com","uri":"/some-article","count":18066}
>>>> {"countId":"2022-02-11T16:

Re: How to proper hashCode() for keys.

2022-02-14 Thread John Smith
Hi, thanks. As previously mentioned, processing time. So I regardless when
the event was generated I want to count all events I have right now (as
soon as they are seen by the flink job).

On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek 
wrote:

> Hello John,
>
> Currently you are grouping the elements two times based on some time
> attribute, one while keying - with event time - and one while windowing -
> with
> processing time. Therefore, the windowing mechanism produces a new window
> computation when you see an element with the same key but arrived later
> from
> the previous window start and end timestamps. Can you please clarify with
> which notion of time you would like to handle the stream of data?
>
> Sincerely,
>
> Ali
>
> On Fri, Feb 11, 2022 at 6:43 PM John Smith  wrote:
>
>> Ok I used the method suggested by Ali. The error is gone. But now I see
>> multiple counts emitted for the same key...
>>
>> DataStream slStream = env.fromSource(kafkaSource, 
>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>> .uid(kafkaTopic).name(kafkaTopic)
>> .setParallelism(kafkaParallelism)
>> .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) 
>> <-- Timestamp in GMT created here rounded to the closest minute down.
>> .uid("map-json-logs").name("map-json-logs");
>>
>> slStream.keyBy(new MinutesKeySelector())
>> 
>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) 
>> < Tumbling window of 1 minute.
>>
>>
>>
>> So below you will see a new count was emitted at 16:51 and 16:55
>>
>> {"countId":"2022-02-11T16:50:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":3542}
>> -
>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":16503}
>> {"countId":"2022-02-11T16:51:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":70}
>> -
>>
>> {"countId":"2022-02-11T16:52:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":16037}
>> {"countId":"2022-02-11T16:53:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":18679}
>> {"countId":"2022-02-11T16:54:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":17697}
>> -
>>
>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":18066}
>> {"countId":"2022-02-11T16:55:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":58}
>> -
>> {"countId":"2022-02-11T16:56:00Z|mydomain.com
>> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
>> mydomain.com","uri":"/some-article","count":17489}
>>
>>
>>
>>
>> On Mon, Feb 7, 2022 at 12:44 PM John Smith 
>> wrote:
>>
>>> Ok I think Ali's solution makes the most sense to me. I'll try it and
>>> let you know.
>>>
>>> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge  wrote:
>>>
>>>> Hi John,
>>>>
>>>> your getKey() implementation shows that it is not deterministic, since
>>>> calling it with the same click instance multiple times will return
>>>> different keys. For example a call at 12:0

Re: How to proper hashCode() for keys.

2022-02-11 Thread John Smith
Ok I used the method suggested by Ali. The error is gone. But now I see
multiple counts emitted for the same key...

DataStream slStream = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "Kafka Source")
.uid(kafkaTopic).name(kafkaTopic)
.setParallelism(kafkaParallelism)
.flatMap(new MapToMyEvent("my-event", windowSizeMins,
"message")) <-- Timestamp in GMT created here rounded to the
closest minute down.
.uid("map-json-logs").name("map-json-logs");

slStream.keyBy(new MinutesKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins)))
< Tumbling window of 1 minute.



So below you will see a new count was emitted at 16:51 and 16:55

{"countId":"2022-02-11T16:50:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
mydomain.com","uri":"/some-article","count":3542}
-
{"countId":"2022-02-11T16:51:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
mydomain.com","uri":"/some-article","count":16503}
{"countId":"2022-02-11T16:51:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
mydomain.com","uri":"/some-article","count":70}
-

{"countId":"2022-02-11T16:52:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
mydomain.com","uri":"/some-article","count":16037}
{"countId":"2022-02-11T16:53:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
mydomain.com","uri":"/some-article","count":18679}
{"countId":"2022-02-11T16:54:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
mydomain.com","uri":"/some-article","count":17697}
-

{"countId":"2022-02-11T16:55:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
mydomain.com","uri":"/some-article","count":18066}
{"countId":"2022-02-11T16:55:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
mydomain.com","uri":"/some-article","count":58}
-
{"countId":"2022-02-11T16:56:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
mydomain.com","uri":"/some-article","count":17489}




On Mon, Feb 7, 2022 at 12:44 PM John Smith  wrote:

> Ok I think Ali's solution makes the most sense to me. I'll try it and let
> you know.
>
> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge  wrote:
>
>> Hi John,
>>
>> your getKey() implementation shows that it is not deterministic, since
>> calling it with the same click instance multiple times will return
>> different keys. For example a call at 12:01:59.950 and a call at
>> 12:02:00.050 with the same click instance will return two different keys:
>>
>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>>
>> best regards
>> Jing
>>
>> On Mon, Feb 7, 2022 at 5:07 PM John Smith  wrote:
>>
>>> Maybe there's a misunderstanding. But basically I want to do clickstream
>>> count for a given "url" and for simplicity and accuracy of the count base
>>> it on processing time (event time doesn't matter as long as I get a total
>>> of clicks at that given processing time)
>>>
>>> So regardless of the event time. I want all clicks for the current
>>> processing time rounded to the minute per link.
>>>
>>> So, if now was 2022-04-07T12:01:00.000Z
>>>
>>> Then I would want the following result...
>>>
>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
>>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
>>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
>>> 
>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
>>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
>>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
>>> 

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
Ok I think Ali's solution makes the most sense to me. I'll try it and let
you know.

On Mon, Feb 7, 2022 at 11:44 AM Jing Ge  wrote:

> Hi John,
>
> your getKey() implementation shows that it is not deterministic, since
> calling it with the same click instance multiple times will return
> different keys. For example a call at 12:01:59.950 and a call at
> 12:02:00.050 with the same click instance will return two different keys:
>
> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>
> best regards
> Jing
>
> On Mon, Feb 7, 2022 at 5:07 PM John Smith  wrote:
>
>> Maybe there's a misunderstanding. But basically I want to do clickstream
>> count for a given "url" and for simplicity and accuracy of the count base
>> it on processing time (event time doesn't matter as long as I get a total
>> of clicks at that given processing time)
>>
>> So regardless of the event time. I want all clicks for the current
>> processing time rounded to the minute per link.
>>
>> So, if now was 2022-04-07T12:01:00.000Z
>>
>> Then I would want the following result...
>>
>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
>> 
>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
>> And so on...
>>
>> @Override
>> public MyEventCountKey getKey(final MyEvent click) throws Exception
>> {
>> MyEventCountKey key = new MyEventCountKey(
>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
>> click.getDomain(), // cnn.com
>> click.getPath(), // /some-article-name
>> );
>> return key;
>> }
>>
>>
>>
>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:
>>
>>> The key selector works.
>>>
>>>
>>> No it does not ;) It depends on the system time so it's not
>>> deterministic (you can get different keys for the very same element).
>>>
>>> How do you key a count based on the time. I have taken this from samples
>>>> online.
>>>>
>>>
>>> This is what the windowing is for. You basically want to group / combine
>>> elements per key and event time window [1].
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>>>
>>> Best,
>>> D.
>>>
>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith 
>>> wrote:
>>>
>>>> The key selector works. It only causes an issue if there too many keys
>>>> produced in one shot. For example of 100 "same" keys are produced for that
>>>> 1 minutes it's ok. But if 101 are produced the error happens.
>>>>
>>>>
>>>> If you look at the reproducer at least that's what's hapenning
>>>>
>>>> How do you key a count based on the time. I have taken this from
>>>> samples online.
>>>>
>>>> The key is that particular time for that particular URL path.
>>>>
>>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>>>
>>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
>>>> wrote:
>>>>
>>>>> Your Key selector doesn't need to implement hashCode, but given the
>>>>> same object it has to return the same key.
>>>>> In your reproducer the returned key will have different timestamps,
>>>>> and since the timestamp is included in the hashCode, they will be 
>>>>> different
>>>>> each time.
>>>>>
>>>>> On 07/02/2022 14:50, John Smith wrote:
>>>>>
>>>>> I don't get it? I provided the reproducer. I implemented the interface
>>>>> to Key selector it needs hashcode and equals as well?
>>>>>
>>>>> I'm attempting to do click stream. So the key is based on processing
>>>>> date/time rounded to the minute + domain name + path
>>>>>
>>>>> So these should be valid below?
>>>>>
>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>>> 2022-01-01T10:

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
Maybe there's a misunderstanding. But basically I want to do clickstream
count for a given "url" and for simplicity and accuracy of the count base
it on processing time (event time doesn't matter as long as I get a total
of clicks at that given processing time)

So regardless of the event time. I want all clicks for the current
processing time rounded to the minute per link.

So, if now was 2022-04-07T12:01:00.000Z

Then I would want the following result...

2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15

2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
And so on...

@Override
public MyEventCountKey getKey(final MyEvent click) throws Exception
{
MyEventCountKey key = new MyEventCountKey(
Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")), ChronoField.
MINUTE_OF_HOUR, windowSizeMins)).toString(),
click.getDomain(), // cnn.com
click.getPath(), // /some-article-name
);
return key;
}



On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:

> The key selector works.
>
>
> No it does not ;) It depends on the system time so it's not deterministic
> (you can get different keys for the very same element).
>
> How do you key a count based on the time. I have taken this from samples
>> online.
>>
>
> This is what the windowing is for. You basically want to group / combine
> elements per key and event time window [1].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
>
> Best,
> D.
>
> On Mon, Feb 7, 2022 at 3:44 PM John Smith  wrote:
>
>> The key selector works. It only causes an issue if there too many keys
>> produced in one shot. For example of 100 "same" keys are produced for that
>> 1 minutes it's ok. But if 101 are produced the error happens.
>>
>>
>> If you look at the reproducer at least that's what's hapenning
>>
>> How do you key a count based on the time. I have taken this from samples
>> online.
>>
>> The key is that particular time for that particular URL path.
>>
>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00
>>
>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
>> wrote:
>>
>>> Your Key selector doesn't need to implement hashCode, but given the same
>>> object it has to return the same key.
>>> In your reproducer the returned key will have different timestamps, and
>>> since the timestamp is included in the hashCode, they will be different
>>> each time.
>>>
>>> On 07/02/2022 14:50, John Smith wrote:
>>>
>>> I don't get it? I provided the reproducer. I implemented the interface
>>> to Key selector it needs hashcode and equals as well?
>>>
>>> I'm attempting to do click stream. So the key is based on processing
>>> date/time rounded to the minute + domain name + path
>>>
>>> So these should be valid below?
>>>
>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>> 2022-01-01T10:02:00 + cnn.com + /article1
>>>
>>> 2022-01-01T10:02:00 + cnn.com + /article2
>>>
>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>> 2022-01-01T10:03:00 + cnn.com + /article1
>>>
>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>> 2022-01-01T10:03:00 + cnn.com + /article3
>>>
>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
>>> wrote:
>>>
>>>> Don't KeySelectors also need to be deterministic?
>>>>
>>>> * The {@link KeySelector} allows to use deterministic objects for 
>>>> operations such as reduce,* reduceGroup, join, coGroup, etc. *If invoked 
>>>> multiple times on the same object, the returned key*** must be the same.*
>>>>
>>>>
>>>> On 04/02/2022 18:25, John Smith wrote:
>>>>
>>>> Hi Francesco,  here is the reproducer:
>>>> https://github.com/javadevmtl/flink-key-reproducer
>>>>
>>>> So, essentially it looks like when there's a high influx of records
>>>> produced from the source that the Exception is thrown.
>>>>
>>>> The key is generated by 3 values: date/time rounded to the minute and 2
>>>> strings.
>>>> So you will see keys as follows...
>>>> 2022-02-04T17:20:00Z|foo|bar
>>>> 2022-02-04T17:21:00Z|fo

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
The key selector works. It only causes an issue if there too many keys
produced in one shot. For example of 100 "same" keys are produced for that
1 minutes it's ok. But if 101 are produced the error happens.


If you look at the reproducer at least that's what's hapenning

How do you key a count based on the time. I have taken this from samples
online.

The key is that particular time for that particular URL path.

So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00

On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, 
wrote:

> Your Key selector doesn't need to implement hashCode, but given the same
> object it has to return the same key.
> In your reproducer the returned key will have different timestamps, and
> since the timestamp is included in the hashCode, they will be different
> each time.
>
> On 07/02/2022 14:50, John Smith wrote:
>
> I don't get it? I provided the reproducer. I implemented the interface to
> Key selector it needs hashcode and equals as well?
>
> I'm attempting to do click stream. So the key is based on processing
> date/time rounded to the minute + domain name + path
>
> So these should be valid below?
>
> 2022-01-01T10:02:00 + cnn.com + /article1
> 2022-01-01T10:02:00 + cnn.com + /article1
> 2022-01-01T10:02:00 + cnn.com + /article1
>
> 2022-01-01T10:02:00 + cnn.com + /article2
>
> 2022-01-01T10:03:00 + cnn.com + /article1
> 2022-01-01T10:03:00 + cnn.com + /article1
>
> 2022-01-01T10:03:00 + cnn.com + /article3
> 2022-01-01T10:03:00 + cnn.com + /article3
>
> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
> wrote:
>
>> Don't KeySelectors also need to be deterministic?
>>
>> * The {@link KeySelector} allows to use deterministic objects for operations 
>> such as reduce,* reduceGroup, join, coGroup, etc. *If invoked multiple times 
>> on the same object, the returned key*** must be the same.*
>>
>>
>> On 04/02/2022 18:25, John Smith wrote:
>>
>> Hi Francesco,  here is the reproducer:
>> https://github.com/javadevmtl/flink-key-reproducer
>>
>> So, essentially it looks like when there's a high influx of records
>> produced from the source that the Exception is thrown.
>>
>> The key is generated by 3 values: date/time rounded to the minute and 2
>> strings.
>> So you will see keys as follows...
>> 2022-02-04T17:20:00Z|foo|bar
>> 2022-02-04T17:21:00Z|foo|bar
>> 2022-02-04T17:22:00Z|foo|bar
>>
>> The reproducer has a custom source that basically produces a record in a
>> loop and sleeps for a specified period of milliseconds 100ms in this case.
>> The lower the sleep delay the faster records are produced the more
>> chances the exception is thrown. With a 100ms delay it's always thrown.
>> Setting a 2000 to 3000ms will guarantee it to work.
>> The original job uses a Kafka Source so it should technically be able to
>> handle even a couple thousand records per second.
>>
>>
>> On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:
>>
>>> Ok it's not my data either. I think it may be a volume issue. I have
>>> managed to consistently reproduce the error. I'll upload a reproducer ASAP.
>>>
>>>
>>>
>>> On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:
>>>
>>>> Ok so I tried to create a reproducer but I couldn't reproduce it. But
>>>> the actual job once in a while throws that error. So I'm wondering if maybe
>>>> one of the records that comes in is not valid, though I do validate prior
>>>> to getting to the key and window operators.
>>>>
>>>> On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:
>>>>
>>>>> Actually maybe not because with PrintSinkFunction it ran for a bit and
>>>>> then it threw the error.
>>>>>
>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith 
>>>>> wrote:
>>>>>
>>>>>> Ok it may be the ElasticSearch connector causing the issue?
>>>>>>
>>>>>> If I use PrintSinkFunction then I get no error and my stats print as
>>>>>> expected.
>>>>>>
>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>>>>> france...@ververica.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> your hash code and equals seems correct. Can you post a minimum
>>>>>>> stream pipeline reproducer using this class?
>>>>>>>
>>>>>>> FG
>>>>>>>
>>>>>>> On Tue, Feb 1, 2022 a

Re: How to proper hashCode() for keys.

2022-02-07 Thread John Smith
I don't get it? I provided the reproducer. I implemented the interface to
Key selector it needs hashcode and equals as well?

I'm attempting to do click stream. So the key is based on processing
date/time rounded to the minute + domain name + path

So these should be valid below?

2022-01-01T10:02:00 + cnn.com + /article1
2022-01-01T10:02:00 + cnn.com + /article1
2022-01-01T10:02:00 + cnn.com + /article1

2022-01-01T10:02:00 + cnn.com + /article2

2022-01-01T10:03:00 + cnn.com + /article1
2022-01-01T10:03:00 + cnn.com + /article1

2022-01-01T10:03:00 + cnn.com + /article3
2022-01-01T10:03:00 + cnn.com + /article3

On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, 
wrote:

> Don't KeySelectors also need to be deterministic?
>
> * The {@link KeySelector} allows to use deterministic objects for operations 
> such as reduce,* reduceGroup, join, coGroup, etc. *If invoked multiple times 
> on the same object, the returned key*** must be the same.*
>
>
> On 04/02/2022 18:25, John Smith wrote:
>
> Hi Francesco,  here is the reproducer:
> https://github.com/javadevmtl/flink-key-reproducer
>
> So, essentially it looks like when there's a high influx of records
> produced from the source that the Exception is thrown.
>
> The key is generated by 3 values: date/time rounded to the minute and 2
> strings.
> So you will see keys as follows...
> 2022-02-04T17:20:00Z|foo|bar
> 2022-02-04T17:21:00Z|foo|bar
> 2022-02-04T17:22:00Z|foo|bar
>
> The reproducer has a custom source that basically produces a record in a
> loop and sleeps for a specified period of milliseconds 100ms in this case.
> The lower the sleep delay the faster records are produced the more chances
> the exception is thrown. With a 100ms delay it's always thrown. Setting a
> 2000 to 3000ms will guarantee it to work.
> The original job uses a Kafka Source so it should technically be able to
> handle even a couple thousand records per second.
>
>
> On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:
>
>> Ok it's not my data either. I think it may be a volume issue. I have
>> managed to consistently reproduce the error. I'll upload a reproducer ASAP.
>>
>>
>>
>> On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:
>>
>>> Ok so I tried to create a reproducer but I couldn't reproduce it. But
>>> the actual job once in a while throws that error. So I'm wondering if maybe
>>> one of the records that comes in is not valid, though I do validate prior
>>> to getting to the key and window operators.
>>>
>>> On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:
>>>
>>>> Actually maybe not because with PrintSinkFunction it ran for a bit and
>>>> then it threw the error.
>>>>
>>>> On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:
>>>>
>>>>> Ok it may be the ElasticSearch connector causing the issue?
>>>>>
>>>>> If I use PrintSinkFunction then I get no error and my stats print as
>>>>> expected.
>>>>>
>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>>>> france...@ververica.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>> your hash code and equals seems correct. Can you post a minimum
>>>>>> stream pipeline reproducer using this class?
>>>>>>
>>>>>> FG
>>>>>>
>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not
>>>>>>> in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're 
>>>>>>> directly
>>>>>>> using low level state access APIs, this is most likely caused by
>>>>>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>>>>>
>>>>>>> This is my class, is my hashCode deterministic?
>>>>>>>
>>>>>>> public final class MyEventCountKey {
>>>>>>> private final String countDateTime;private final String domain; 
>>>>>>>private final String event;public MyEventCountKey(final String 
>>>>>>> countDateTime, final String domain, final String event) {
>>>>>>> this.countDateTime = countDateTime;this.domain = 
>>>>>>> domain;this.event = event;}
>>>>>>>
>>>>>>> public String getCountDateTime() {
>>>>>>> return countDateTim

Re: How to proper hashCode() for keys.

2022-02-04 Thread John Smith
Hi Francesco,  here is the reproducer:
https://github.com/javadevmtl/flink-key-reproducer

So, essentially it looks like when there's a high influx of records
produced from the source that the Exception is thrown.

The key is generated by 3 values: date/time rounded to the minute and 2
strings.
So you will see keys as follows...
2022-02-04T17:20:00Z|foo|bar
2022-02-04T17:21:00Z|foo|bar
2022-02-04T17:22:00Z|foo|bar

The reproducer has a custom source that basically produces a record in a
loop and sleeps for a specified period of milliseconds 100ms in this case.
The lower the sleep delay the faster records are produced the more chances
the exception is thrown. With a 100ms delay it's always thrown. Setting a
2000 to 3000ms will guarantee it to work.
The original job uses a Kafka Source so it should technically be able to
handle even a couple thousand records per second.


On Thu, 3 Feb 2022 at 16:41, John Smith  wrote:

> Ok it's not my data either. I think it may be a volume issue. I have
> managed to consistently reproduce the error. I'll upload a reproducer ASAP.
>
>
>
> On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:
>
>> Ok so I tried to create a reproducer but I couldn't reproduce it. But the
>> actual job once in a while throws that error. So I'm wondering if maybe one
>> of the records that comes in is not valid, though I do validate prior to
>> getting to the key and window operators.
>>
>> On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:
>>
>>> Actually maybe not because with PrintSinkFunction it ran for a bit and
>>> then it threw the error.
>>>
>>> On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:
>>>
>>>> Ok it may be the ElasticSearch connector causing the issue?
>>>>
>>>> If I use PrintSinkFunction then I get no error and my stats print as
>>>> expected.
>>>>
>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>>> france...@ververica.com> wrote:
>>>>
>>>>> Hi,
>>>>> your hash code and equals seems correct. Can you post a minimum stream
>>>>> pipeline reproducer using this class?
>>>>>
>>>>> FG
>>>>>
>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith 
>>>>> wrote:
>>>>>
>>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not
>>>>>> in KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're 
>>>>>> directly
>>>>>> using low level state access APIs, this is most likely caused by
>>>>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>>>>
>>>>>> This is my class, is my hashCode deterministic?
>>>>>>
>>>>>> public final class MyEventCountKey {
>>>>>> private final String countDateTime;
>>>>>> private final String domain;
>>>>>> private final String event;
>>>>>>
>>>>>> public MyEventCountKey(final String countDateTime, final String 
>>>>>> domain, final String event) {
>>>>>> this.countDateTime = countDateTime;
>>>>>> this.domain = domain;
>>>>>> this.event = event;
>>>>>> }
>>>>>>
>>>>>> public String getCountDateTime() {
>>>>>> return countDateTime;
>>>>>> }
>>>>>>
>>>>>> public String getDomain() {
>>>>>> return domain;
>>>>>> }
>>>>>>
>>>>>> public String getEven() {
>>>>>> return event;
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public String toString() {
>>>>>> return countDateTime + "|" + domain + "|" + event;
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public boolean equals(Object o) {
>>>>>> if (this == o) return true;
>>>>>> if (o == null || getClass() != o.getClass()) return false;
>>>>>> MyEventCountKey that = (MyEventCountKey) o;
>>>>>> return countDateTime.equals(that.countDateTime) &&
>>>>>> domain.equals(that.domain) &&
>>>>>> event.equals(that.event);
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public int hashCode() {
>>>>>> final int prime = 31;
>>>>>> int result = 1;
>>>>>> result = prime * result + countDateTime.hashCode();
>>>>>> result = prime * result + domain.hashCode();
>>>>>> result = prime * result +  event.hashCode();
>>>>>> return result;
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>


Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Ok it's not my data either. I think it may be a volume issue. I have
managed to consistently reproduce the error. I'll upload a reproducer ASAP.



On Thu, 3 Feb 2022 at 15:37, John Smith  wrote:

> Ok so I tried to create a reproducer but I couldn't reproduce it. But the
> actual job once in a while throws that error. So I'm wondering if maybe one
> of the records that comes in is not valid, though I do validate prior to
> getting to the key and window operators.
>
> On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:
>
>> Actually maybe not because with PrintSinkFunction it ran for a bit and
>> then it threw the error.
>>
>> On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:
>>
>>> Ok it may be the ElasticSearch connector causing the issue?
>>>
>>> If I use PrintSinkFunction then I get no error and my stats print as
>>> expected.
>>>
>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani <
>>> france...@ververica.com> wrote:
>>>
>>>> Hi,
>>>> your hash code and equals seems correct. Can you post a minimum stream
>>>> pipeline reproducer using this class?
>>>>
>>>> FG
>>>>
>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith 
>>>> wrote:
>>>>
>>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
>>>>> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
>>>>> using low level state access APIs, this is most likely caused by
>>>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>>>
>>>>> This is my class, is my hashCode deterministic?
>>>>>
>>>>> public final class MyEventCountKey {
>>>>> private final String countDateTime;
>>>>> private final String domain;
>>>>> private final String event;
>>>>>
>>>>> public MyEventCountKey(final String countDateTime, final String 
>>>>> domain, final String event) {
>>>>> this.countDateTime = countDateTime;
>>>>> this.domain = domain;
>>>>> this.event = event;
>>>>> }
>>>>>
>>>>> public String getCountDateTime() {
>>>>> return countDateTime;
>>>>> }
>>>>>
>>>>> public String getDomain() {
>>>>> return domain;
>>>>> }
>>>>>
>>>>> public String getEven() {
>>>>> return event;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public String toString() {
>>>>> return countDateTime + "|" + domain + "|" + event;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public boolean equals(Object o) {
>>>>> if (this == o) return true;
>>>>> if (o == null || getClass() != o.getClass()) return false;
>>>>> MyEventCountKey that = (MyEventCountKey) o;
>>>>> return countDateTime.equals(that.countDateTime) &&
>>>>> domain.equals(that.domain) &&
>>>>> event.equals(that.event);
>>>>> }
>>>>>
>>>>> @Override
>>>>> public int hashCode() {
>>>>> final int prime = 31;
>>>>> int result = 1;
>>>>> result = prime * result + countDateTime.hashCode();
>>>>> result = prime * result + domain.hashCode();
>>>>> result = prime * result +  event.hashCode();
>>>>> return result;
>>>>> }
>>>>> }
>>>>>
>>>>>


Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Ok so I tried to create a reproducer but I couldn't reproduce it. But the
actual job once in a while throws that error. So I'm wondering if maybe one
of the records that comes in is not valid, though I do validate prior to
getting to the key and window operators.

On Thu, 3 Feb 2022 at 14:32, John Smith  wrote:

> Actually maybe not because with PrintSinkFunction it ran for a bit and
> then it threw the error.
>
> On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:
>
>> Ok it may be the ElasticSearch connector causing the issue?
>>
>> If I use PrintSinkFunction then I get no error and my stats print as
>> expected.
>>
>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani 
>> wrote:
>>
>>> Hi,
>>> your hash code and equals seems correct. Can you post a minimum stream
>>> pipeline reproducer using this class?
>>>
>>> FG
>>>
>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith 
>>> wrote:
>>>
>>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
>>>> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
>>>> using low level state access APIs, this is most likely caused by
>>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>>
>>>> This is my class, is my hashCode deterministic?
>>>>
>>>> public final class MyEventCountKey {
>>>> private final String countDateTime;
>>>> private final String domain;
>>>> private final String event;
>>>>
>>>> public MyEventCountKey(final String countDateTime, final String 
>>>> domain, final String event) {
>>>> this.countDateTime = countDateTime;
>>>> this.domain = domain;
>>>> this.event = event;
>>>> }
>>>>
>>>> public String getCountDateTime() {
>>>> return countDateTime;
>>>> }
>>>>
>>>> public String getDomain() {
>>>> return domain;
>>>> }
>>>>
>>>> public String getEven() {
>>>> return event;
>>>> }
>>>>
>>>> @Override
>>>> public String toString() {
>>>> return countDateTime + "|" + domain + "|" + event;
>>>> }
>>>>
>>>> @Override
>>>> public boolean equals(Object o) {
>>>> if (this == o) return true;
>>>> if (o == null || getClass() != o.getClass()) return false;
>>>> MyEventCountKey that = (MyEventCountKey) o;
>>>> return countDateTime.equals(that.countDateTime) &&
>>>> domain.equals(that.domain) &&
>>>> event.equals(that.event);
>>>> }
>>>>
>>>> @Override
>>>> public int hashCode() {
>>>> final int prime = 31;
>>>> int result = 1;
>>>> result = prime * result + countDateTime.hashCode();
>>>> result = prime * result + domain.hashCode();
>>>> result = prime * result +  event.hashCode();
>>>> return result;
>>>> }
>>>> }
>>>>
>>>>


Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Actually maybe not because with PrintSinkFunction it ran for a bit and then
it threw the error.

On Thu, 3 Feb 2022 at 14:24, John Smith  wrote:

> Ok it may be the ElasticSearch connector causing the issue?
>
> If I use PrintSinkFunction then I get no error and my stats print as
> expected.
>
> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani 
> wrote:
>
>> Hi,
>> your hash code and equals seems correct. Can you post a minimum stream
>> pipeline reproducer using this class?
>>
>> FG
>>
>> On Tue, Feb 1, 2022 at 8:39 PM John Smith  wrote:
>>
>>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
>>> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
>>> using low level state access APIs, this is most likely caused by
>>> non-deterministic shuffle key (hashCode and equals implementation).
>>>
>>> This is my class, is my hashCode deterministic?
>>>
>>> public final class MyEventCountKey {
>>> private final String countDateTime;
>>> private final String domain;
>>> private final String event;
>>>
>>> public MyEventCountKey(final String countDateTime, final String domain, 
>>> final String event) {
>>> this.countDateTime = countDateTime;
>>> this.domain = domain;
>>> this.event = event;
>>> }
>>>
>>> public String getCountDateTime() {
>>> return countDateTime;
>>> }
>>>
>>> public String getDomain() {
>>> return domain;
>>> }
>>>
>>> public String getEven() {
>>> return event;
>>> }
>>>
>>> @Override
>>> public String toString() {
>>> return countDateTime + "|" + domain + "|" + event;
>>> }
>>>
>>> @Override
>>> public boolean equals(Object o) {
>>> if (this == o) return true;
>>> if (o == null || getClass() != o.getClass()) return false;
>>> MyEventCountKey that = (MyEventCountKey) o;
>>> return countDateTime.equals(that.countDateTime) &&
>>> domain.equals(that.domain) &&
>>> event.equals(that.event);
>>> }
>>>
>>> @Override
>>> public int hashCode() {
>>> final int prime = 31;
>>> int result = 1;
>>> result = prime * result + countDateTime.hashCode();
>>> result = prime * result + domain.hashCode();
>>> result = prime * result +  event.hashCode();
>>> return result;
>>> }
>>> }
>>>
>>>


Re: How to proper hashCode() for keys.

2022-02-03 Thread John Smith
Ok it may be the ElasticSearch connector causing the issue?

If I use PrintSinkFunction then I get no error and my stats print as
expected.

On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani 
wrote:

> Hi,
> your hash code and equals seems correct. Can you post a minimum stream
> pipeline reproducer using this class?
>
> FG
>
> On Tue, Feb 1, 2022 at 8:39 PM John Smith  wrote:
>
>> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
>> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
>> using low level state access APIs, this is most likely caused by
>> non-deterministic shuffle key (hashCode and equals implementation).
>>
>> This is my class, is my hashCode deterministic?
>>
>> public final class MyEventCountKey {
>> private final String countDateTime;
>> private final String domain;
>> private final String event;
>>
>> public MyEventCountKey(final String countDateTime, final String domain, 
>> final String event) {
>> this.countDateTime = countDateTime;
>> this.domain = domain;
>> this.event = event;
>> }
>>
>> public String getCountDateTime() {
>> return countDateTime;
>> }
>>
>> public String getDomain() {
>> return domain;
>> }
>>
>> public String getEven() {
>> return event;
>> }
>>
>> @Override
>> public String toString() {
>> return countDateTime + "|" + domain + "|" + event;
>> }
>>
>> @Override
>> public boolean equals(Object o) {
>> if (this == o) return true;
>> if (o == null || getClass() != o.getClass()) return false;
>> MyEventCountKey that = (MyEventCountKey) o;
>> return countDateTime.equals(that.countDateTime) &&
>> domain.equals(that.domain) &&
>> event.equals(that.event);
>> }
>>
>> @Override
>> public int hashCode() {
>> final int prime = 31;
>> int result = 1;
>> result = prime * result + countDateTime.hashCode();
>> result = prime * result + domain.hashCode();
>> result = prime * result +  event.hashCode();
>> return result;
>> }
>> }
>>
>>


How to proper hashCode() for keys.

2022-02-01 Thread John Smith
Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
using low level state access APIs, this is most likely caused by
non-deterministic shuffle key (hashCode and equals implementation).

This is my class, is my hashCode deterministic?

public final class MyEventCountKey {
private final String countDateTime;
private final String domain;
private final String event;

public MyEventCountKey(final String countDateTime, final String
domain, final String event) {
this.countDateTime = countDateTime;
this.domain = domain;
this.event = event;
}

public String getCountDateTime() {
return countDateTime;
}

public String getDomain() {
return domain;
}

public String getEven() {
return event;
}

@Override
public String toString() {
return countDateTime + "|" + domain + "|" + event;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MyEventCountKey that = (MyEventCountKey) o;
return countDateTime.equals(that.countDateTime) &&
domain.equals(that.domain) &&
event.equals(that.event);
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + countDateTime.hashCode();
result = prime * result + domain.hashCode();
result = prime * result +  event.hashCode();
return result;
}
}


Re: Show plan in UI not working.

2022-02-01 Thread John Smith
Hi here it is: https://issues.apache.org/jira/browse/FLINK-25812

Finally I think it looks liek a Javascript issue on the Ui rather than the
cluster.

On Tue, 25 Jan 2022 at 02:58, Ingo Bürk  wrote:

> Hi John,
>
> can you please submit this as an issue in JIRA? If you suspect it is
> related to other issues, just make a note of that in the issue as well.
> Thanks!
>
>
> Ingo
>
> On 23.01.22 18:05, John Smith wrote:
> > Just I'm case but in 1.14.x regardless of the job manager is leader or
> > not. Before submitting a job of you click on "Show Plan" it just shows a
> > blank window.
> >
> > I'm assuming it's similar issue as the deserialozation ones.
>


Re: Kafka Consumer Group Name not set if no checkpointing?

2022-02-01 Thread John Smith
Ok that's fine. It's just a thing we are used to that functionality from
basically every other consumer we have flink or not. So we monitor the
offsets for lateness or just to look.

On Tue, 1 Feb 2022 at 03:38, Fabian Paul  wrote:

> Hi John,
>
> You are seeing what I described in my previous mail. The KafkaSource
> only writes consumer offsets to Kafka when a checkpoint is finished
> [1]. Flink does not leverage the offsets stored in Kafka to ensure
> exactly-once processing but it writes the last read offset to Flink's
> internal state that is part of the checkpoint.
>
> Please be also aware that it is not guaranteed that the offsets you
> are seeing with Kafka Explorer reflect the latest record read by the
> KafkaSource because the offset is committed asynchronously and we do
> not ensure that it succeeds.
>
> Maybe you can share with us why you want to inspect the progress of
> the KafkaSource with Kafka Explorer.
>
> Best,
> Fabian
>
>
> [1]
> https://github.com/apache/flink/blob/dea2b10502a493e9d4137e7d94d2dac85d9fa666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L221
>
> On Mon, Jan 31, 2022 at 8:08 PM John Smith  wrote:
> >
> > Hi yes, see below. So it only seems to show the consumer offsets if
> checkpointing is on... That's the only diff I can see between my two
> different jobs. And the moment I enabled it on the job. It started showing
> in Kafka Explorer here: https://www.kafkatool.com/
> >
> > return KafkaSource.builder()
> > .setBootstrapServers(bootstrapServers)
> > .setTopics(topic)
> > .setValueOnlyDeserializer(new VertxJsonSchema())
> > .setGroupId(consumerGroup)
> > .setStartingOffsets(oi)
> > .setProperties(props)
> > .build();
> >
> >
> > On Mon, 31 Jan 2022 at 12:03, Fabian Paul  wrote:
> >>
> >> Hi John,
> >> First I would like to ask you to give us some more information about
> >> how you consume from Kafka with Flink. It is currently recommended to
> >> use the KafkaSource that subsumes the deprecated FlinkKafkaConsumer.
> >>
> >> One thing to already note is that by default Flink does not commit the
> >> Kafka to offset back to the topic because it is not needed from a
> >> Flink perspective and is only supported on a best-effort basis if a
> >> checkpoint completes.
> >>
> >> I am not very familiar with Kafka Explorer but I can imagine it only
> >> shows the consumer group if there are actually committed offsets
> >>
> >> Best,
> >> Fabian
> >>
> >> On Mon, Jan 31, 2022 at 3:41 PM John Smith 
> wrote:
> >> >
> >> > Hi using flink 1.14.3, I have a job that doesn't use checkpointing. I
> have noticed that the Kafka Consumer Group is not set?
> >> >
> >> > I use Kafka Explorer to see all the consumers and when I run the job
> I don't see the consumer group. Finally I decided to enable checkpointing
> and restart the job and finally saw the consumer group.
> >> >
> >> > Is this expected behaviour?
>


Re: Tumbling window apply will not "fire"

2022-01-31 Thread John Smith
ok it's working! Thanks. Just out of curiosity, why is the println of keyBy
printing twice?

On Mon, 31 Jan 2022 at 17:22, John Smith  wrote:

> Oh ok. I was reading here:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/learn-flink/streaming_analytics/#latency-vs-completeness
> and Idid a cut and paste lol
>
> Ok let you know.
>
> On Mon, 31 Jan 2022 at 17:18, Dario Heinisch 
> wrote:
>
>> Then you should be using a process based time window, in your case:
>> TumblingProcessingTimeWindows
>>
>> See
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/
>> for more info
>> On 31.01.22 23:13, John Smith wrote:
>>
>> Hi Dario, I don't care about event time I just want to do tumbling window
>> over the "processing time" I.e: count whatever I have in the last 5 minutes.
>>
>> On Mon, 31 Jan 2022 at 17:09, Dario Heinisch 
>> wrote:
>>
>>> Hi John
>>>
>>> This is because you are using event time (TumblingEventTimeWinodws) but
>>> you do not have a event time watermark strategy.
>>> It is also why I opened:
>>> https://issues.apache.org/jira/browse/FLINK-24623 because I feel like
>>> Flink should be throwing an exception in that case
>>> on startup.
>>>
>>> Take a look at the documentation at:
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
>>> which should have everything.
>>>
>>> > In order to work with event time, Flink needs to know the events
>>> timestamps, meaning each element in the stream needs to have its event
>>> timestamp assigned. This is usually done by accessing/extracting the
>>> timestamp from > some field in the element by using a TimestampAssigner.
>>> > Timestamp assignment goes hand-in-hand with generating watermarks,
>>> which tell the system about progress in event time. You can configure this
>>> by specifying a WatermarkGenerator.
>>>
>>> Best regards,
>>>
>>> Dario
>>> On 31.01.22 22:28, John Smith wrote:
>>>
>>> Hi I have the following job... I'm expecting the System.out
>>> .println(key.toString());   to at least print, but nothing prints.
>>>
>>> - .flatMap: Fires prints my debug message once as expected.
>>> - .keyBy: Also fires, but prints my debug message twice.
>>> - .apply: Doesn't seem to fire. The debug statement doesn't seem to
>>> print. I'm expecting it to print the key from above keyBy.
>>>
>>> DataStream slStream = env.fromSource(kafkaSource, 
>>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>>> .uid(kafkaTopic).name(kafkaTopic)
>>> .setParallelism(1)
>>> .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
>>> .uid("map-json-logs").name("map-json-logs");
>>> slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <--- This prints 
>>> twice
>>> .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
>>> .apply(new WindowFunction>> String, String>, TimeWindow>() {
>>> @Overridepublic void apply(Tuple3>> String> key, TimeWindow window, Iterable input, Collector 
>>> out) throws Exception {
>>> // This should print.
>>> System.out.println(key.toString());// Do nothing for now
>>> }
>>> })
>>> .uid("process").name("process")
>>> ;
>>>
>>>
>>>
>>>


Re: Tumbling window apply will not "fire"

2022-01-31 Thread John Smith
Oh ok. I was reading here:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/learn-flink/streaming_analytics/#latency-vs-completeness
and Idid a cut and paste lol

Ok let you know.

On Mon, 31 Jan 2022 at 17:18, Dario Heinisch 
wrote:

> Then you should be using a process based time window, in your case:
> TumblingProcessingTimeWindows
>
> See
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/
> for more info
> On 31.01.22 23:13, John Smith wrote:
>
> Hi Dario, I don't care about event time I just want to do tumbling window
> over the "processing time" I.e: count whatever I have in the last 5 minutes.
>
> On Mon, 31 Jan 2022 at 17:09, Dario Heinisch 
> wrote:
>
>> Hi John
>>
>> This is because you are using event time (TumblingEventTimeWinodws) but
>> you do not have a event time watermark strategy.
>> It is also why I opened:
>> https://issues.apache.org/jira/browse/FLINK-24623 because I feel like
>> Flink should be throwing an exception in that case
>> on startup.
>>
>> Take a look at the documentation at:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
>> which should have everything.
>>
>> > In order to work with event time, Flink needs to know the events
>> timestamps, meaning each element in the stream needs to have its event
>> timestamp assigned. This is usually done by accessing/extracting the
>> timestamp from > some field in the element by using a TimestampAssigner.
>> > Timestamp assignment goes hand-in-hand with generating watermarks,
>> which tell the system about progress in event time. You can configure this
>> by specifying a WatermarkGenerator.
>>
>> Best regards,
>>
>> Dario
>> On 31.01.22 22:28, John Smith wrote:
>>
>> Hi I have the following job... I'm expecting the System.out
>> .println(key.toString());   to at least print, but nothing prints.
>>
>> - .flatMap: Fires prints my debug message once as expected.
>> - .keyBy: Also fires, but prints my debug message twice.
>> - .apply: Doesn't seem to fire. The debug statement doesn't seem to
>> print. I'm expecting it to print the key from above keyBy.
>>
>> DataStream slStream = env.fromSource(kafkaSource, 
>> WatermarkStrategy.noWatermarks(), "Kafka Source")
>> .uid(kafkaTopic).name(kafkaTopic)
>> .setParallelism(1)
>> .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
>> .uid("map-json-logs").name("map-json-logs");
>> slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <--- This prints 
>> twice
>> .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
>> .apply(new WindowFunction> String, String>, TimeWindow>() {
>> @Overridepublic void apply(Tuple3> String> key, TimeWindow window, Iterable input, Collector 
>> out) throws Exception {
>> // This should print.
>> System.out.println(key.toString());// Do nothing for now 
>>}
>> })
>> .uid("process").name("process")
>> ;
>>
>>
>>
>>


Re: Tumbling window apply will not "fire"

2022-01-31 Thread John Smith
Hi Dario, I don't care about event time I just want to do tumbling window
over the "processing time" I.e: count whatever I have in the last 5 minutes.

On Mon, 31 Jan 2022 at 17:09, Dario Heinisch 
wrote:

> Hi John
>
> This is because you are using event time (TumblingEventTimeWinodws) but
> you do not have a event time watermark strategy.
> It is also why I opened: https://issues.apache.org/jira/browse/FLINK-24623
> because I feel like Flink should be throwing an exception in that case
> on startup.
>
> Take a look at the documentation at:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
> which should have everything.
>
> > In order to work with event time, Flink needs to know the events
> timestamps, meaning each element in the stream needs to have its event
> timestamp assigned. This is usually done by accessing/extracting the
> timestamp from > some field in the element by using a TimestampAssigner.
> > Timestamp assignment goes hand-in-hand with generating watermarks, which
> tell the system about progress in event time. You can configure this by
> specifying a WatermarkGenerator.
>
> Best regards,
>
> Dario
> On 31.01.22 22:28, John Smith wrote:
>
> Hi I have the following job... I'm expecting the System.out
> .println(key.toString());   to at least print, but nothing prints.
>
> - .flatMap: Fires prints my debug message once as expected.
> - .keyBy: Also fires, but prints my debug message twice.
> - .apply: Doesn't seem to fire. The debug statement doesn't seem to print.
> I'm expecting it to print the key from above keyBy.
>
> DataStream slStream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "Kafka Source")
> .uid(kafkaTopic).name(kafkaTopic)
> .setParallelism(1)
> .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
> .uid("map-json-logs").name("map-json-logs");
> slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <--- This prints 
> twice
> .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
> .apply(new WindowFunction String, String>, TimeWindow>() {
> @Overridepublic void apply(Tuple3 String> key, TimeWindow window, Iterable input, Collector 
> out) throws Exception {
> // This should print.
> System.out.println(key.toString());// Do nothing for now  
>   }
> })
> .uid("process").name("process")
> ;
>
>
>
>


Tumbling window apply will not "fire"

2022-01-31 Thread John Smith
Hi I have the following job... I'm expecting the System.out
.println(key.toString());   to at least print, but nothing prints.

- .flatMap: Fires prints my debug message once as expected.
- .keyBy: Also fires, but prints my debug message twice.
- .apply: Doesn't seem to fire. The debug statement doesn't seem to print.
I'm expecting it to print the key from above keyBy.

DataStream slStream = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "Kafka Source")
.uid(kafkaTopic).name(kafkaTopic)
.setParallelism(1)
.flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
.uid("map-json-logs").name("map-json-logs");
slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <---
This prints twice
.window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
.apply(new WindowFunction, TimeWindow>() {
@Override
public void apply(Tuple3 key,
TimeWindow window, Iterable input, Collector out)
throws Exception {
// This should print.
System.out.println(key.toString());

// Do nothing for now
}
})
.uid("process").name("process")
;


Re: Kafka Consumer Group Name not set if no checkpointing?

2022-01-31 Thread John Smith
Hi yes, see below. So it only seems to show the consumer offsets if
checkpointing is on... That's the only diff I can see between my two
different jobs. And the moment I enabled it on the job. It started showing
in Kafka Explorer here: https://www.kafkatool.com/

return KafkaSource.builder()
.setBootstrapServers(bootstrapServers)
.setTopics(topic)
.setValueOnlyDeserializer(new VertxJsonSchema())
.setGroupId(consumerGroup)
.setStartingOffsets(oi)
.setProperties(props)
.build();


On Mon, 31 Jan 2022 at 12:03, Fabian Paul  wrote:

> Hi John,
> First I would like to ask you to give us some more information about
> how you consume from Kafka with Flink. It is currently recommended to
> use the KafkaSource that subsumes the deprecated FlinkKafkaConsumer.
>
> One thing to already note is that by default Flink does not commit the
> Kafka to offset back to the topic because it is not needed from a
> Flink perspective and is only supported on a best-effort basis if a
> checkpoint completes.
>
> I am not very familiar with Kafka Explorer but I can imagine it only
> shows the consumer group if there are actually committed offsets
>
> Best,
> Fabian
>
> On Mon, Jan 31, 2022 at 3:41 PM John Smith  wrote:
> >
> > Hi using flink 1.14.3, I have a job that doesn't use checkpointing. I
> have noticed that the Kafka Consumer Group is not set?
> >
> > I use Kafka Explorer to see all the consumers and when I run the job I
> don't see the consumer group. Finally I decided to enable checkpointing and
> restart the job and finally saw the consumer group.
> >
> > Is this expected behaviour?
>


Kafka Consumer Group Name not set if no checkpointing?

2022-01-31 Thread John Smith
Hi using flink 1.14.3, I have a job that doesn't use checkpointing. I have
noticed that the Kafka Consumer Group is not set?

I use Kafka Explorer to see all the consumers and when I run the job I
don't see the consumer group. Finally I decided to enable checkpointing and
restart the job and finally saw the consumer group.

Is this expected behaviour?


Re: How to run in IDE?

2022-01-25 Thread John Smith
I'm using: final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

But no go.

On Mon, 24 Jan 2022 at 16:35, John Smith  wrote:

> Hi using Flink 1.14.3 with gradle. I explicitly added the flink client
> dependency and the job starts but it quits with...
>
> In Flink 1.10 the job worked as is. How do I set the number of slots and
> is there any other settings for the IDE?
>
> 16:29:50,633 INFO
>  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
>  - Received resource requirements from job
> 3a3e9c46da413071392bce161c39270f:
> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
> numberOfRequiredSlots=2}]
> 16:29:50,633 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- Sink: Print to Std. Out (14/16) (d7c4fbf5f23f3118e54998f2b35338c1)
> switched from CANCELING to CANCELED.
>


How to run in IDE?

2022-01-24 Thread John Smith
Hi using Flink 1.14.3 with gradle. I explicitly added the flink client
dependency and the job starts but it quits with...

In Flink 1.10 the job worked as is. How do I set the number of slots and is
there any other settings for the IDE?

16:29:50,633 INFO
 org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
 - Received resource requirements from job
3a3e9c46da413071392bce161c39270f:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=2}]
16:29:50,633 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
 - Sink: Print to Std. Out (14/16) (d7c4fbf5f23f3118e54998f2b35338c1)
switched from CANCELING to CANCELED.


Show plan in UI not working.

2022-01-23 Thread John Smith
Just I'm case but in 1.14.x regardless of the job manager is leader or not.
Before submitting a job of you click on "Show Plan" it just shows a blank
window.

I'm assuming it's similar issue as the deserialozation ones.


Re: Unhandled exception in flink 1.14.2

2022-01-21 Thread John Smith
Ok I see. So I guess in 1.4.3, one was fixed but broke something else?

Because in 1.4.2 it feels like it worked after a refresh but with 1.4.3 it
always fails looks like.


On Fri., Jan. 21, 2022, 3:29 a.m. Chesnay Schepler, 
wrote:

> What you are seeing in 1.14.3 is a separate bug, that behaves very
> similarly. https://issues.apache.org/jira/browse/FLINK-25732
>
> On 20/01/2022 22:11, John Smith wrote:
>
> As per another recent thread. This is still an issue.
>
> On Wed, 19 Jan 2022 at 06:36, Chesnay Schepler  wrote:
>
>> This is a serialization bug in Flink, see
>> https://issues.apache.org/jira/browse/FLINK-24550.
>> It will be fixed in the upcoming 1.14.3 release.
>>
>> On 19/01/2022 09:01, Caizhi Weng wrote:
>>
>> Hi!
>>
>> To print out gc logs of job manager you can add this configuration to
>> flink-conf.yaml
>>
>> env.java.opts.jobmanager: -Xloggc:/tmp/jobmanager-gc.log
>> -XX:+PrintGCDetails -XX:+PrintGCDateStamps
>>
>> This will print gc logs to /tmp/jobmanager-gc.log.
>>
>> I'm not familiar with the garbage collection metrics page. If the unit of
>> time is ms then gc does not seem to be heavy. However I would still
>> recommend to print out gc logs for a double check.
>>
>> John Smith  于2022年1月19日周三 06:43写道:
>>
>>> I think I may know what is causing the issue... So I have 3 job managers.
>>>
>>> 1- I Navigated to a non leader UI and submitted a new job...
>>> 2- The UI timed out with grey lines
>>> 3- Some Internal Server error messages appeared.
>>> 4- Going back to the leader UI checking the running jobs, the job seems
>>> to have been submitted and running.
>>> 5- Going back to the job manager UI that failed, now shows ok.
>>>
>>> And the logs are as follows... And below are the GC metrics from the UI.
>>>
>>> 2022-01-18 22:33:24,574 INFO
>>> org.apache.flink.client.deployment.application.executors.
>>> EmbeddedExecutor [] - Job 2297fdac52fa7191afee9ec4ff11c805 is submitted.
>>> 2022-01-18 22:33:24,574 INFO
>>> org.apache.flink.client.deployment.application.executors.
>>> EmbeddedExecutor [] - Submitting Job with JobId=2297f
>>> dac52fa7191afee9ec4ff11c805.
>>> 2022-01-18 22:34:00,618 ERROR org.apache.flink.runtime.rest.handler.job.
>>> JobDetailsHandler [] - Unhandled exception.
>>> java.util.concurrent.CancellationException: null
>>> at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:
>>> 2276) ~[?:1.8.0_312]
>>> at org.apache.flink.runtime.rest.handler.legacy.
>>> DefaultExecutionGraphCache.getExecutionGraphInternal(
>>> DefaultExecutionGraphCache.java:98) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at org.apache.flink.runtime.rest.handler.legacy.
>>> DefaultExecutionGraphCache.getExecutionGraphInfo(
>>> DefaultExecutionGraphCache.java:67) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at org.apache.flink.runtime.rest.handler.job.
>>> AbstractExecutionGraphHandler.handleRequest(
>>> AbstractExecutionGraphHandler.java:81) ~[flink-dist_2.12-1.14.2.jar:1.14
>>> .2]
>>> at org.apache.flink.runtime.rest.handler.AbstractRestHandler
>>> .respondToRequest(AbstractRestHandler.java:83) ~[flink-dist_2.12-1.14.2
>>> .jar:1.14.2]
>>> at org.apache.flink.runtime.rest.handler.AbstractHandler
>>> .respondAsLeader(AbstractHandler.java:195) ~[flink-dist_2.12-1.14.2.jar:
>>> 1.14.2]
>>> at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
>>> .lambda$channelRead0$0(LeaderRetrievalHandler.java:83) ~[flink-dist_2.12
>>> -1.14.2.jar:1.14.2]
>>> at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_312]
>>> at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer
>>> .java:45) [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
>>> .channelRead0(LeaderRetrievalHandler.java:80) [flink-dist_2.12-1.14.2
>>> .jar:1.14.2]
>>> at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
>>> .channelRead0(LeaderRetrievalHandler.java:49) [flink-dist_2.12-1.14.2
>>> .jar:1.14.2]
>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>> SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler
>>> .java:99) [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>> AbstractChannelHandlerContext.invokeChannelRead(
>>> AbstractChannelHandlerContext.java:379) [flink-dist_2.12-1.14.2.jar:1.14
&

Re: Unhandled exception in flink 1.14.2

2022-01-20 Thread John Smith
As per another recent thread. This is still an issue.

On Wed, 19 Jan 2022 at 06:36, Chesnay Schepler  wrote:

> This is a serialization bug in Flink, see
> https://issues.apache.org/jira/browse/FLINK-24550.
> It will be fixed in the upcoming 1.14.3 release.
>
> On 19/01/2022 09:01, Caizhi Weng wrote:
>
> Hi!
>
> To print out gc logs of job manager you can add this configuration to
> flink-conf.yaml
>
> env.java.opts.jobmanager: -Xloggc:/tmp/jobmanager-gc.log
> -XX:+PrintGCDetails -XX:+PrintGCDateStamps
>
> This will print gc logs to /tmp/jobmanager-gc.log.
>
> I'm not familiar with the garbage collection metrics page. If the unit of
> time is ms then gc does not seem to be heavy. However I would still
> recommend to print out gc logs for a double check.
>
> John Smith  于2022年1月19日周三 06:43写道:
>
>> I think I may know what is causing the issue... So I have 3 job managers.
>>
>> 1- I Navigated to a non leader UI and submitted a new job...
>> 2- The UI timed out with grey lines
>> 3- Some Internal Server error messages appeared.
>> 4- Going back to the leader UI checking the running jobs, the job seems
>> to have been submitted and running.
>> 5- Going back to the job manager UI that failed, now shows ok.
>>
>> And the logs are as follows... And below are the GC metrics from the UI.
>>
>> 2022-01-18 22:33:24,574 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Job 2297fdac52fa7191afee9ec4ff11c805 is submitted.
>> 2022-01-18 22:33:24,574 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Submitting Job with JobId=2297fdac52fa7191afee9ec4ff11c805.
>> 2022-01-18 22:34:00,618 ERROR org.apache.flink.runtime.rest.handler.job.
>> JobDetailsHandler [] - Unhandled exception.
>> java.util.concurrent.CancellationException: null
>> at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:
>> 2276) ~[?:1.8.0_312]
>> at org.apache.flink.runtime.rest.handler.legacy.
>> DefaultExecutionGraphCache.getExecutionGraphInternal(
>> DefaultExecutionGraphCache.java:98) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.legacy.
>> DefaultExecutionGraphCache.getExecutionGraphInfo(
>> DefaultExecutionGraphCache.java:67) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.job.
>> AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler
>> .java:81) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.AbstractRestHandler
>> .respondToRequest(AbstractRestHandler.java:83) ~[flink-dist_2.12-1.14.2
>> .jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(
>> AbstractHandler.java:195) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
>> .lambda$channelRead0$0(LeaderRetrievalHandler.java:83) ~[flink-dist_2.12-
>> 1.14.2.jar:1.14.2]
>> at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_312]
>> at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer
>> .java:45) [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
>> .channelRead0(LeaderRetrievalHandler.java:80) [flink-dist_2.12-1.14.2
>> .jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
>> .channelRead0(LeaderRetrievalHandler.java:49) [flink-dist_2.12-1.14.2
>> .jar:1.14.2]
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:
>> 99) [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.invokeChannelRead(
>> AbstractChannelHandlerContext.java:379) [flink-dist_2.12-1.14.2.jar:1.14.
>> 2]
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.invokeChannelRead(
>> AbstractChannelHandlerContext.java:365) [flink-dist_2.12-1.14.2.jar:1.14.
>> 2]
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.fireChannelRead(
>> AbstractChannelHandlerContext.java:357) [flink-dist_2.12-1.14.2.jar:1.14.
>> 2]
>> at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(
>> RouterHandler.java:115) [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.router.RouterHandler
>> .channelRead0(RouterHandler.java:94) [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.router.RouterHandler
>

Re: Flink 1.14.3: Can not access job information from a jobmanager UI

2022-01-20 Thread John Smith
I had the same issue in my thread it was mentioned that it was supposed to
be fixed in 1.14.3

On Thu, 20 Jan 2022 at 07:40, Martin  wrote:

> Thanks for the quick response, I assumed thats already known, but was not
> able to find the issue. Thanks :)
>
> Chesnay Schepler schrieb am 20.01.2022 13:36 (GMT +01:00):
>
> This is a bug in Flink for which I have filed a ticket:
> https://issues.apache.org/jira/browse/FLINK-25732
>
> As is you can only request the job overview from the leading jobmanager.
>
> On 20/01/2022 13:15, Martin wrote:
>
>
>
>


Re: Unhandled exception in flink 1.14.2

2022-01-18 Thread John Smith
]
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(
FileUploadHandler.java:238) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(
FileUploadHandler.java:71) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:379) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:365) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
.java:357) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
CombinedChannelDuplexHandler$DelegatingChannelHandlerContext
.fireChannelRead(CombinedChannelDuplexHandler.java:436) [flink-dist_2.12-
1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.
ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.
ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:
251) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:379) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:365) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
.java:357) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
1410) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:379) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:365) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline
.fireChannelRead(DefaultChannelPipeline.java:919) [flink-dist_2.12-1.14.2
.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.
AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
.processSelectedKey(NioEventLoop.java:719) [flink-dist_2.12-1.14.2.jar:1.14.
2]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
.processSelectedKeysOptimized(NioEventLoop.java:655) [flink-dist_2.12-1.14.2
.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
.processSelectedKeys(NioEventLoop.java:581) [flink-dist_2.12-1.14.2.jar:1.14
.2]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(
NioEventLoop.java:493) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2
.run(ThreadExecutorMap.java:74) [flink-dist_2.12-1.14.2.jar:1.14.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]

These are the stats displayed on the UI for the garbage collector...

Advanced
JVM (Heap/Non-Heap) Memory
TypeCommittedUsedMaximum
Heap 2.87 GB 571 MB 2.87 GB
Non-Heap 136 MB 128 MB 744 MB
Outside JVM Memory
TypeCountUsedCapacity
Direct 33 642 KB 642 KB
Mapped 0 0 B 0 B
Garbage Collection
CollectorCountTime
PS_MarkSweep 3 346
PS_Scavenge 83 2772


On Tue, 18 Jan 2022 at 17:15, John Smith  wrote:

> Sorry, unhandled exception.
>
> On Tue, 18 Jan 2022 at 15:16, John Smith  wrote:
>
>> Also that I handled exception is printed in job managers.
>>
>> On Tue., Jan. 18, 2022, 10:11 a.m. John Smith, 
>> wrote:
>>
>>> I actually mean the job manager. I run a total of three job managers for
>>> HA.
>>>
>>> For example of I click running jobs, it displays light grey boxes for a
>>> while and then top right corner throws internal server error. But after if
>>> I refresh it's ok I see the list. It seems to happen on the non leader job
>>> manager UI

Re: Unhandled exception in flink 1.14.2

2022-01-18 Thread John Smith
Sorry, unhandled exception.

On Tue, 18 Jan 2022 at 15:16, John Smith  wrote:

> Also that I handled exception is printed in job managers.
>
> On Tue., Jan. 18, 2022, 10:11 a.m. John Smith, 
> wrote:
>
>> I actually mean the job manager. I run a total of three job managers for
>> HA.
>>
>> For example of I click running jobs, it displays light grey boxes for a
>> while and then top right corner throws internal server error. But after if
>> I refresh it's ok I see the list. It seems to happen on the non leader job
>> manager UIs. Looking at the UI metric for the job manager doesn't seem to
>> indicate anything out of the ordinary.
>>
>> Is there a java 8 command tool that can hook into the live PID and
>> display GC stats?
>>
>> On Mon., Jan. 17, 2022, 12:06 a.m. Caizhi Weng, 
>> wrote:
>>
>>> Hi!
>>>
>>> "When I use the UI and just navigate from one jobmanager ui to the
>>> other", I guess you mean you're talking about task managers instead of job
>>> managers (there is only one job manager per web ui).
>>>
>>> This long pause usually indicates that the component (no matter task
>>> managers or job manager) you're jumping to is very busy so that they can't
>>> handle web requests. This is not a healthy state and you should look into
>>> it. The most probable cause is heavy GC. To determine this you can print GC
>>> details to log and see if there are long GC pauses.
>>>
>>> John Smith  于2022年1月15日周六 09:53写道:
>>>
>>>> Hi using 1.14.2, running 3 job nodes.
>>>>
>>>> Everything seems to work ok.
>>>>
>>>> When I use the UI and just navigate from one jobmanager ui to the
>>>> other, it sometimes seems to take long or timeout and I see an "Internal
>>>> Server Error" popup on the top right messages.
>>>>
>>>> Looking at the logs I see this, but not sure it's related...
>>>>
>>>> 2022-01-15 00:04:44,925 ERROR
>>>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] -
>>>> Unhandled exception.
>>>> java.util.concurrent.CancellationException: null
>>>> at
>>>> java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
>>>> ~[?:1.8.0_312]
>>>> at
>>>> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInternal(DefaultExecutionGraphCache.java:98)
>>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>> at
>>>> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInfo(DefaultExecutionGraphCache.java:67)
>>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>> at
>>>> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:81)
>>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>> at
>>>> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
>>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>> at
>>>> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
>>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>> at
>>>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>> at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_312]
>>>> at
>>>> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>>>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>>>> at
>>>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>>>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>>>> at
>>>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>>>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>>>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.i

Re: Unhandled exception in flink 1.14.2

2022-01-18 Thread John Smith
Also that I handled exception is printed in job managers.

On Tue., Jan. 18, 2022, 10:11 a.m. John Smith, 
wrote:

> I actually mean the job manager. I run a total of three job managers for
> HA.
>
> For example of I click running jobs, it displays light grey boxes for a
> while and then top right corner throws internal server error. But after if
> I refresh it's ok I see the list. It seems to happen on the non leader job
> manager UIs. Looking at the UI metric for the job manager doesn't seem to
> indicate anything out of the ordinary.
>
> Is there a java 8 command tool that can hook into the live PID and display
> GC stats?
>
> On Mon., Jan. 17, 2022, 12:06 a.m. Caizhi Weng, 
> wrote:
>
>> Hi!
>>
>> "When I use the UI and just navigate from one jobmanager ui to the
>> other", I guess you mean you're talking about task managers instead of job
>> managers (there is only one job manager per web ui).
>>
>> This long pause usually indicates that the component (no matter task
>> managers or job manager) you're jumping to is very busy so that they can't
>> handle web requests. This is not a healthy state and you should look into
>> it. The most probable cause is heavy GC. To determine this you can print GC
>> details to log and see if there are long GC pauses.
>>
>> John Smith  于2022年1月15日周六 09:53写道:
>>
>>> Hi using 1.14.2, running 3 job nodes.
>>>
>>> Everything seems to work ok.
>>>
>>> When I use the UI and just navigate from one jobmanager ui to the other,
>>> it sometimes seems to take long or timeout and I see an "Internal Server
>>> Error" popup on the top right messages.
>>>
>>> Looking at the logs I see this, but not sure it's related...
>>>
>>> 2022-01-15 00:04:44,925 ERROR
>>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] -
>>> Unhandled exception.
>>> java.util.concurrent.CancellationException: null
>>> at
>>> java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
>>> ~[?:1.8.0_312]
>>> at
>>> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInternal(DefaultExecutionGraphCache.java:98)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInfo(DefaultExecutionGraphCache.java:67)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:81)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_312]
>>> at
>>> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at
>>> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHan

Re: Unhandled exception in flink 1.14.2

2022-01-18 Thread John Smith
I actually mean the job manager. I run a total of three job managers for HA.

For example of I click running jobs, it displays light grey boxes for a
while and then top right corner throws internal server error. But after if
I refresh it's ok I see the list. It seems to happen on the non leader job
manager UIs. Looking at the UI metric for the job manager doesn't seem to
indicate anything out of the ordinary.

Is there a java 8 command tool that can hook into the live PID and display
GC stats?

On Mon., Jan. 17, 2022, 12:06 a.m. Caizhi Weng, 
wrote:

> Hi!
>
> "When I use the UI and just navigate from one jobmanager ui to the other",
> I guess you mean you're talking about task managers instead of job managers
> (there is only one job manager per web ui).
>
> This long pause usually indicates that the component (no matter task
> managers or job manager) you're jumping to is very busy so that they can't
> handle web requests. This is not a healthy state and you should look into
> it. The most probable cause is heavy GC. To determine this you can print GC
> details to log and see if there are long GC pauses.
>
> John Smith  于2022年1月15日周六 09:53写道:
>
>> Hi using 1.14.2, running 3 job nodes.
>>
>> Everything seems to work ok.
>>
>> When I use the UI and just navigate from one jobmanager ui to the other,
>> it sometimes seems to take long or timeout and I see an "Internal Server
>> Error" popup on the top right messages.
>>
>> Looking at the logs I see this, but not sure it's related...
>>
>> 2022-01-15 00:04:44,925 ERROR
>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] -
>> Unhandled exception.
>> java.util.concurrent.CancellationException: null
>> at
>> java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
>> ~[?:1.8.0_312]
>> at
>> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInternal(DefaultExecutionGraphCache.java:98)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInfo(DefaultExecutionGraphCache.java:67)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:81)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_312]
>> at
>> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
>> [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChanne

Unhandled exception in flink 1.14.2

2022-01-14 Thread John Smith
Hi using 1.14.2, running 3 job nodes.

Everything seems to work ok.

When I use the UI and just navigate from one jobmanager ui to the other, it
sometimes seems to take long or timeout and I see an "Internal Server
Error" popup on the top right messages.

Looking at the logs I see this, but not sure it's related...

2022-01-15 00:04:44,925 ERROR
org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] -
Unhandled exception.
java.util.concurrent.CancellationException: null
at
java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
~[?:1.8.0_312]
at
org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInternal(DefaultExecutionGraphCache.java:98)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInfo(DefaultExecutionGraphCache.java:67)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:81)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_312]
at
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:238)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:71)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

Re: How to handle java.lang.OutOfMemoryError: Metaspace

2021-12-29 Thread John Smith
Given the numbers above from JCMD. You think I should be ok with 2GB
metaspace? That was captured while all jobs where running on the cluster
for that 1 node.


I set it to 2GB. But none of the above numbers indicated max 2GB metaspace.

On Mon., Dec. 27, 2021, 10:47 a.m. John Smith, 
wrote:

> Ok all settings above are for smaller dev cluster and I'm experimenting to
> set metasize to 2GB. It runs same jobs as production just less volume in
> terms of data.
>
> The below snapshot of JCMD are of a slightly bigger task manager and the
> active cluster... It also once in a while does metaspace so thinking
> updating metaspace to 2GB. This is what started the actual investigation.
>
> taskmanager.memory.flink.size: 10240m
> taskmanager.memory.jvm-metaspace.size: 1024m <-- Up to 2GB.
> taskmanager.numberOfTaskSlots: 12
>
> jcmd 2128 GC.heap_info
> 2128:
>  garbage-first heap   total 5111808K, used 2530277K [0x00068880,
> 0x000688a04e00, 0x0007c080)
>   region size 2048K, 810 young (1658880K), 4 survivors (8192K)
>  Metaspace   used 998460K, capacity 1022929K, committed 1048576K,
> reserved 1972224K
>   class spaceused 112823K, capacity 121063K, committed 126024K,
> reserved 1048576K
>
> On Mon, 27 Dec 2021 at 10:27, John Smith  wrote:
>
>> Yes standalone cluster. 3 zoo, 3 job, 3 tasks.
>>
>> The task managers have taskslots at double core. So 2*4
>>
>> I think metaspace of 2GB is ok. I'll try to get some jcmd stats.
>>
>> The jobs are fairly straight forward ETL they read from Kafka, do some
>> json parsing, using vertx.io json parser and either Insert to apache
>> ignite cache or jdbc db.
>>
>>
>> On Sun., Dec. 26, 2021, 8:46 p.m. Xintong Song, 
>> wrote:
>>
>>> Hi John,
>>>
>>> Sounds to me you have a Flink standalone cluster deployed directly on
>>> physical hosts. If that is the case, use `t.m.flink.size` instead of
>>> `t.m.process.size`. The latter does not limit the overall memory
>>> consumption of the processes, and is only used for calculating how much
>>> non-JVM memory the process should leave in a containerized setup, which
>>> does no good in a non-containerized setup.
>>>
>>> When running into a Metaspace OOM, the standard solution is to increase
>>> `t.m.jvm-metaspace.size`. If this is impractical due to the physical
>>> limitations, you may also try to decrease `taskmanager.numberOfTaskSlots`.
>>> If you have multiple jobs submitted to a shared Flink cluster, decreasing
>>> the number of slots in a task manager should also reduce the amount of
>>> classes loaded by the JVM, thus requiring less metaspace.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Mon, Dec 27, 2021 at 9:08 AM John Smith 
>>> wrote:
>>>
>>>> Ok I tried taskmanager.memory.process.size: 7168m
>>>>
>>>> It's worst, the task manager can barely start before it throws
>>>> java.lang.OutOfMemoryError: Metaspace
>>>>
>>>> I will try...
>>>> taskmanager.memory.flink.size: 5120m
>>>> taskmanager.memory.jvm-metaspace.size: 2048m
>>>>
>>>>
>>>> On Sun, 26 Dec 2021 at 19:46, John Smith 
>>>> wrote:
>>>>
>>>>> Hi running Flink 1.10
>>>>>
>>>>> I have
>>>>>
>>>>> taskmanager.memory.flink.size: 6144m
>>>>> taskmanager.memory.jvm-metaspace.size: 1024m
>>>>> taskmanager.numberOfTaskSlots: 8
>>>>> parallelism.default: 1
>>>>>
>>>>> 1- The host has a physical ram of 8GB. I'm better off just to
>>>>> configure "taskmanager.memory.process.size" as 7GB and let flink figure it
>>>>> out?
>>>>> 2- Is there a way for me to calculate how much metspace my jobs
>>>>> require or are using?
>>>>>
>>>>> 2021-12-24 04:53:32,511 ERROR
>>>>> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
>>>>> Thread 'flink-akka.actor.default-dispatcher-86' produced an uncaught
>>>>> exception. Stopping the process...
>>>>> java.lang.OutOfMemoryError: Metaspace
>>>>>
>>>>


Re: How to handle java.lang.OutOfMemoryError: Metaspace

2021-12-27 Thread John Smith
Ok all settings above are for smaller dev cluster and I'm experimenting to
set metasize to 2GB. It runs same jobs as production just less volume in
terms of data.

The below snapshot of JCMD are of a slightly bigger task manager and the
active cluster... It also once in a while does metaspace so thinking
updating metaspace to 2GB. This is what started the actual investigation.

taskmanager.memory.flink.size: 10240m
taskmanager.memory.jvm-metaspace.size: 1024m <-- Up to 2GB.
taskmanager.numberOfTaskSlots: 12

jcmd 2128 GC.heap_info
2128:
 garbage-first heap   total 5111808K, used 2530277K [0x00068880,
0x000688a04e00, 0x0007c080)
  region size 2048K, 810 young (1658880K), 4 survivors (8192K)
 Metaspace   used 998460K, capacity 1022929K, committed 1048576K,
reserved 1972224K
  class spaceused 112823K, capacity 121063K, committed 126024K,
reserved 1048576K

On Mon, 27 Dec 2021 at 10:27, John Smith  wrote:

> Yes standalone cluster. 3 zoo, 3 job, 3 tasks.
>
> The task managers have taskslots at double core. So 2*4
>
> I think metaspace of 2GB is ok. I'll try to get some jcmd stats.
>
> The jobs are fairly straight forward ETL they read from Kafka, do some
> json parsing, using vertx.io json parser and either Insert to apache
> ignite cache or jdbc db.
>
>
> On Sun., Dec. 26, 2021, 8:46 p.m. Xintong Song, 
> wrote:
>
>> Hi John,
>>
>> Sounds to me you have a Flink standalone cluster deployed directly on
>> physical hosts. If that is the case, use `t.m.flink.size` instead of
>> `t.m.process.size`. The latter does not limit the overall memory
>> consumption of the processes, and is only used for calculating how much
>> non-JVM memory the process should leave in a containerized setup, which
>> does no good in a non-containerized setup.
>>
>> When running into a Metaspace OOM, the standard solution is to increase
>> `t.m.jvm-metaspace.size`. If this is impractical due to the physical
>> limitations, you may also try to decrease `taskmanager.numberOfTaskSlots`.
>> If you have multiple jobs submitted to a shared Flink cluster, decreasing
>> the number of slots in a task manager should also reduce the amount of
>> classes loaded by the JVM, thus requiring less metaspace.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Mon, Dec 27, 2021 at 9:08 AM John Smith 
>> wrote:
>>
>>> Ok I tried taskmanager.memory.process.size: 7168m
>>>
>>> It's worst, the task manager can barely start before it throws
>>> java.lang.OutOfMemoryError: Metaspace
>>>
>>> I will try...
>>> taskmanager.memory.flink.size: 5120m
>>> taskmanager.memory.jvm-metaspace.size: 2048m
>>>
>>>
>>> On Sun, 26 Dec 2021 at 19:46, John Smith  wrote:
>>>
>>>> Hi running Flink 1.10
>>>>
>>>> I have
>>>>
>>>> taskmanager.memory.flink.size: 6144m
>>>> taskmanager.memory.jvm-metaspace.size: 1024m
>>>> taskmanager.numberOfTaskSlots: 8
>>>> parallelism.default: 1
>>>>
>>>> 1- The host has a physical ram of 8GB. I'm better off just to configure
>>>> "taskmanager.memory.process.size" as 7GB and let flink figure it out?
>>>> 2- Is there a way for me to calculate how much metspace my jobs require
>>>> or are using?
>>>>
>>>> 2021-12-24 04:53:32,511 ERROR
>>>> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
>>>> Thread 'flink-akka.actor.default-dispatcher-86' produced an uncaught
>>>> exception. Stopping the process...
>>>> java.lang.OutOfMemoryError: Metaspace
>>>>
>>>


Re: How to handle java.lang.OutOfMemoryError: Metaspace

2021-12-27 Thread John Smith
Yes standalone cluster. 3 zoo, 3 job, 3 tasks.

The task managers have taskslots at double core. So 2*4

I think metaspace of 2GB is ok. I'll try to get some jcmd stats.

The jobs are fairly straight forward ETL they read from Kafka, do some json
parsing, using vertx.io json parser and either Insert to apache ignite
cache or jdbc db.


On Sun., Dec. 26, 2021, 8:46 p.m. Xintong Song, 
wrote:

> Hi John,
>
> Sounds to me you have a Flink standalone cluster deployed directly on
> physical hosts. If that is the case, use `t.m.flink.size` instead of
> `t.m.process.size`. The latter does not limit the overall memory
> consumption of the processes, and is only used for calculating how much
> non-JVM memory the process should leave in a containerized setup, which
> does no good in a non-containerized setup.
>
> When running into a Metaspace OOM, the standard solution is to increase
> `t.m.jvm-metaspace.size`. If this is impractical due to the physical
> limitations, you may also try to decrease `taskmanager.numberOfTaskSlots`.
> If you have multiple jobs submitted to a shared Flink cluster, decreasing
> the number of slots in a task manager should also reduce the amount of
> classes loaded by the JVM, thus requiring less metaspace.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Dec 27, 2021 at 9:08 AM John Smith  wrote:
>
>> Ok I tried taskmanager.memory.process.size: 7168m
>>
>> It's worst, the task manager can barely start before it throws
>> java.lang.OutOfMemoryError: Metaspace
>>
>> I will try...
>> taskmanager.memory.flink.size: 5120m
>> taskmanager.memory.jvm-metaspace.size: 2048m
>>
>>
>> On Sun, 26 Dec 2021 at 19:46, John Smith  wrote:
>>
>>> Hi running Flink 1.10
>>>
>>> I have
>>>
>>> taskmanager.memory.flink.size: 6144m
>>> taskmanager.memory.jvm-metaspace.size: 1024m
>>> taskmanager.numberOfTaskSlots: 8
>>> parallelism.default: 1
>>>
>>> 1- The host has a physical ram of 8GB. I'm better off just to configure
>>> "taskmanager.memory.process.size" as 7GB and let flink figure it out?
>>> 2- Is there a way for me to calculate how much metspace my jobs require
>>> or are using?
>>>
>>> 2021-12-24 04:53:32,511 ERROR
>>> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
>>> Thread 'flink-akka.actor.default-dispatcher-86' produced an uncaught
>>> exception. Stopping the process...
>>> java.lang.OutOfMemoryError: Metaspace
>>>
>>


Re: How to handle java.lang.OutOfMemoryError: Metaspace

2021-12-26 Thread John Smith
Ok I tried taskmanager.memory.process.size: 7168m

It's worst, the task manager can barely start before it throws
java.lang.OutOfMemoryError: Metaspace

I will try...
taskmanager.memory.flink.size: 5120m
taskmanager.memory.jvm-metaspace.size: 2048m


On Sun, 26 Dec 2021 at 19:46, John Smith  wrote:

> Hi running Flink 1.10
>
> I have
>
> taskmanager.memory.flink.size: 6144m
> taskmanager.memory.jvm-metaspace.size: 1024m
> taskmanager.numberOfTaskSlots: 8
> parallelism.default: 1
>
> 1- The host has a physical ram of 8GB. I'm better off just to configure
> "taskmanager.memory.process.size" as 7GB and let flink figure it out?
> 2- Is there a way for me to calculate how much metspace my jobs require or
> are using?
>
> 2021-12-24 04:53:32,511 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
> Thread 'flink-akka.actor.default-dispatcher-86' produced an uncaught
> exception. Stopping the process...
> java.lang.OutOfMemoryError: Metaspace
>


How to handle java.lang.OutOfMemoryError: Metaspace

2021-12-26 Thread John Smith
Hi running Flink 1.10

I have

taskmanager.memory.flink.size: 6144m
taskmanager.memory.jvm-metaspace.size: 1024m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1

1- The host has a physical ram of 8GB. I'm better off just to configure
"taskmanager.memory.process.size" as 7GB and let flink figure it out?
2- Is there a way for me to calculate how much metspace my jobs require or
are using?

2021-12-24 04:53:32,511 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
Thread 'flink-akka.actor.default-dispatcher-86' produced an uncaught
exception. Stopping the process...
java.lang.OutOfMemoryError: Metaspace


Re: How to know if Job nodes are registered in cluster?

2021-12-21 Thread John Smith
Ok so only the leader will indicate it's the leader. The other just say
they are waiting for a lock...

On Tue., Dec. 21, 2021, 9:42 a.m. David Morávek,  wrote:

> Hi John,
>
> there is usually no need to run multiple JM, if you're able to start a new
> one quickly after failure (eg. when you're running on kubernetes). There is
> always only single active leader and other JMs effectively do nothing
> besides competing for the leadership. Zookeeper based HA uses the
> DefaultLeaderRetrievalService, which logs leadership information on DEBUG
> level.
>
> Best,
> D.
>
> On Sun, Dec 19, 2021 at 6:38 AM John Smith  wrote:
>
>> Hi running flink 1.10
>>
>> I have 3 zookeeper nodes and 3 job nodes.
>>
>> 1 nodes has specifically indicated that it was granted leadership with
>> token.
>> The other 2 job nodes. Indicate: Starting ZooKeeperLeaderElectionService
>> ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
>> So is that enough to know. Usually isn't there some message printed on each
>> node indicating to each other who is leader and who is "present"?
>>
>


How to know if Job nodes are registered in cluster?

2021-12-18 Thread John Smith
Hi running flink 1.10

I have 3 zookeeper nodes and 3 job nodes.

1 nodes has specifically indicated that it was granted leadership with
token.
The other 2 job nodes. Indicate: Starting ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
So is that enough to know. Usually isn't there some message printed on each
node indicating to each other who is leader and who is "present"?


Re: Windows and data loss.

2021-12-07 Thread John Smith
blocked
>   2. Can you publish/share the 3 odd lines of code for your watermark
>   strategy setup?
>
>
>
> Just as said before, ignoring-late-events is a default strategy, that can
> be adjusted by means of a custom window trigger which trades off between
> latency, state size, correctness of the final results.
>
>
>
> Thias
>
>
>
> *From:* John Smith 
> *Sent:* Freitag, 26. November 2021 17:17
> *To:* Schwalbe Matthias 
> *Cc:* Caizhi Weng ; user 
> *Subject:* Re: Windows and data loss.
>
>
>
> Or as an example we have a 5 minutes window and lateness of 5 minutes.
>
> We have the following events in the logs
> 10:00:01 PM > Already pushed to Kafka
> 10:00:30 PM > Already pushed to Kafka
> 10:01:00 PM > Already pushed to Kafka
> 10:03:45 PM > Already pushed to Kafka
> 10:04:00 PM > Log agent crashed for 30 minutes not delivered to Kafla
> yet
> 10:05:10 PM > Pushed to Kafka cause I came from a log agent that isn't
> dead.
>
> Flink window of 10:00:00
> 10:00:01 PM > Received
> 10:00:30 PM > Received
> 10:01:00 PM > Received
> 10:03:45 PM > Received
> 10:04:00 PM > Still nothing
>
> Flink window of 10:00:00 5 lateness minutes are up.
> 10:00:01 PM > Counted
> 10:00:30 PM > Counted
> 10:01:00 PM > Counted
> 10:03:45 PM > Counted
> 10:04:00 PM > Still nothing
>
> Flink window of 10:05:00 started
>
> 10:05:10 PM.> I'm new cause I came from a log agent that isn't dead.
> 10:04:00 PM > Still nothing
>
> Flink window of 10:05:00 5 lateness minutes are up.
> 10:05:10 PM.> I have been counted, I'm happy!
> 10:04:00 PM > Still nothing
>
> And so on...
>
> Flink window of 10:30:00 started
> 10:04:00 PM > Hi guys, sorry I'm late 30 minutes, I ran into log agent
> problems. Sorry you are late, you missed the Flink bus.
>
>
>
> On Fri, 26 Nov 2021 at 10:53, John Smith  wrote:
>
> Ok,
>
>
> So processing time we get 100% accuracy because we don't care when the
> event comes, we just count and move along.
>
> As for event time processing, what I meant to say is if for example if the
> log shipper is late at pushing events into Kafka, Flink will not notice
> this, the watermarks will keep watermarking. So given that, let's say we
> have a window of 5 minutes and a lateness of 5 minutes, it means we will
> see counts on the "dashboard" every 10 minutes. But say the log shipper
> fails/falls behind for 30 minutes or more, the Flink Kafka consumer will
> simply not see any events and it will continue chugging along, after 30
> minutes a late event comes in at 2 windows already too late, that event is
> discarded.
>
> Or did I miss the point on the last part?
>
>
>
>
>
> On Fri, 26 Nov 2021 at 09:38, Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
> Actually not, because processing-time does not matter at all.
>
> Event-time timers are always compared to watermark-time progress.
>
> If system happens to be compromised for (say) 4 hours, also watermarks
> won’t progress, hence the windows get not evicted and wait for watermarks
> to pick up from when the system crashed.
>
>
>
> Your watermark strategy can decide how strict you handle time progress:
>
>- Super strict: the watermark time indicates that there will be no
>events with an older timestamp
>- Semi strict: you accept late events and give a time-range when this
>can happen (still processing time put aside)
>
>
>- You need to configure acceptable lateness in your windowing operator
>   - Accepted lateness implies higher overall latency
>
>
>- Custom strategy
>
>
>- Use a combination of accepted lateness and a custom trigger in your
>   windowing operator
>   - The trigger decide when and how often window results are emitted
>   - The following operator would the probably implement some
>   idempotence/updating scheme for the window values
>   - This way you get immediate low latency results and allow for
>   later corrections if late events arrive
>
>
>
> My favorite source on this is Tyler Akidau’s book [1] and the excerpt
> blog: [2] [3]
>
> I believe his code uses Beam, but the same ideas can be implemented
> directly in Flink API
>
>
>
> [1] https://www.oreilly.com/library/view/streaming-systems/9781491983867/
>
> [2] https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/
>
> [3] https://www.oreilly.com/radar/the-world-beyond-batch-streaming-102/
>
>
>
> … happy to discuss

Re: Windows and data loss.

2021-11-26 Thread John Smith
Or as an example we have a 5 minutes window and lateness of 5 minutes.

We have the following events in the logs
10:00:01 PM > Already pushed to Kafka
10:00:30 PM > Already pushed to Kafka
10:01:00 PM > Already pushed to Kafka
10:03:45 PM > Already pushed to Kafka
10:04:00 PM > Log agent crashed for 30 minutes not delivered to Kafla
yet
10:05:10 PM > Pushed to Kafka cause I came from a log agent that isn't
dead.

Flink window of 10:00:00
10:00:01 PM > Received
10:00:30 PM > Received
10:01:00 PM > Received
10:03:45 PM > Received
10:04:00 PM > Still nothing

Flink window of 10:00:00 5 lateness minutes are up.
10:00:01 PM > Counted
10:00:30 PM > Counted
10:01:00 PM > Counted
10:03:45 PM > Counted
10:04:00 PM > Still nothing

Flink window of 10:05:00 started
10:05:10 PM.> I'm new cause I came from a log agent that isn't dead.
10:04:00 PM > Still nothing

Flink window of 10:05:00 5 lateness minutes are up.
10:05:10 PM.> I have been counted, I'm happy!
10:04:00 PM > Still nothing

And so on...

Flink window of 10:30:00 started
10:04:00 PM > Hi guys, sorry I'm late 30 minutes, I ran into log agent
problems. Sorry you are late, you missed the Flink bus.


On Fri, 26 Nov 2021 at 10:53, John Smith  wrote:

> Ok,
>
> So processing time we get 100% accuracy because we don't care when the
> event comes, we just count and move along.
>
> As for event time processing, what I meant to say is if for example if the
> log shipper is late at pushing events into Kafka, Flink will not notice
> this, the watermarks will keep watermarking. So given that, let's say we
> have a window of 5 minutes and a lateness of 5 minutes, it means we will
> see counts on the "dashboard" every 10 minutes. But say the log shipper
> fails/falls behind for 30 minutes or more, the Flink Kafka consumer will
> simply not see any events and it will continue chugging along, after 30
> minutes a late event comes in at 2 windows already too late, that event is
> discarded.
>
> Or did I miss the point on the last part?
>
>
>
> On Fri, 26 Nov 2021 at 09:38, Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
>> Actually not, because processing-time does not matter at all.
>>
>> Event-time timers are always compared to watermark-time progress.
>>
>> If system happens to be compromised for (say) 4 hours, also watermarks
>> won’t progress, hence the windows get not evicted and wait for watermarks
>> to pick up from when the system crashed.
>>
>>
>>
>> Your watermark strategy can decide how strict you handle time progress:
>>
>>- Super strict: the watermark time indicates that there will be no
>>events with an older timestamp
>>- Semi strict: you accept late events and give a time-range when this
>>can happen (still processing time put aside)
>>   - You need to configure acceptable lateness in your windowing
>>   operator
>>   - Accepted lateness implies higher overall latency
>>- Custom strategy
>>   - Use a combination of accepted lateness and a custom trigger in
>>   your windowing operator
>>   - The trigger decide when and how often window results are emitted
>>   - The following operator would the probably implement some
>>   idempotence/updating scheme for the window values
>>   - This way you get immediate low latency results and allow for
>>   later corrections if late events arrive
>>
>>
>>
>> My favorite source on this is Tyler Akidau’s book [1] and the excerpt
>> blog: [2] [3]
>>
>> I believe his code uses Beam, but the same ideas can be implemented
>> directly in Flink API
>>
>>
>>
>> [1] https://www.oreilly.com/library/view/streaming-systems/9781491983867/
>>
>> [2] https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/
>>
>> [3] https://www.oreilly.com/radar/the-world-beyond-batch-streaming-102/
>>
>>
>>
>> … happy to discuss further 
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>>
>>
>> *From:* John Smith 
>> *Sent:* Freitag, 26. November 2021 14:09
>> *To:* Schwalbe Matthias 
>> *Cc:* Caizhi Weng ; user 
>> *Subject:* Re: Windows and data loss.
>>
>>
>>
>> But if we use event time, if a failure happens potentially those events
>> can't be delivered in their windo they will be dropped if they come after
>> the lateness and watermark settings no?
>>
>>
>>
>>
>>
>> On Fri, 26 Nov 20

Re: Windows and data loss.

2021-11-26 Thread John Smith
Ok,

So processing time we get 100% accuracy because we don't care when the
event comes, we just count and move along.

As for event time processing, what I meant to say is if for example if the
log shipper is late at pushing events into Kafka, Flink will not notice
this, the watermarks will keep watermarking. So given that, let's say we
have a window of 5 minutes and a lateness of 5 minutes, it means we will
see counts on the "dashboard" every 10 minutes. But say the log shipper
fails/falls behind for 30 minutes or more, the Flink Kafka consumer will
simply not see any events and it will continue chugging along, after 30
minutes a late event comes in at 2 windows already too late, that event is
discarded.

Or did I miss the point on the last part?



On Fri, 26 Nov 2021 at 09:38, Schwalbe Matthias 
wrote:

> Actually not, because processing-time does not matter at all.
>
> Event-time timers are always compared to watermark-time progress.
>
> If system happens to be compromised for (say) 4 hours, also watermarks
> won’t progress, hence the windows get not evicted and wait for watermarks
> to pick up from when the system crashed.
>
>
>
> Your watermark strategy can decide how strict you handle time progress:
>
>- Super strict: the watermark time indicates that there will be no
>events with an older timestamp
>- Semi strict: you accept late events and give a time-range when this
>can happen (still processing time put aside)
>   - You need to configure acceptable lateness in your windowing
>   operator
>   - Accepted lateness implies higher overall latency
>- Custom strategy
>   - Use a combination of accepted lateness and a custom trigger in
>   your windowing operator
>   - The trigger decide when and how often window results are emitted
>   - The following operator would the probably implement some
>   idempotence/updating scheme for the window values
>   - This way you get immediate low latency results and allow for
>   later corrections if late events arrive
>
>
>
> My favorite source on this is Tyler Akidau’s book [1] and the excerpt
> blog: [2] [3]
>
> I believe his code uses Beam, but the same ideas can be implemented
> directly in Flink API
>
>
>
> [1] https://www.oreilly.com/library/view/streaming-systems/9781491983867/
>
> [2] https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/
>
> [3] https://www.oreilly.com/radar/the-world-beyond-batch-streaming-102/
>
>
>
> … happy to discuss further 
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* John Smith 
> *Sent:* Freitag, 26. November 2021 14:09
> *To:* Schwalbe Matthias 
> *Cc:* Caizhi Weng ; user 
> *Subject:* Re: Windows and data loss.
>
>
>
> But if we use event time, if a failure happens potentially those events
> can't be delivered in their windo they will be dropped if they come after
> the lateness and watermark settings no?
>
>
>
>
>
> On Fri, 26 Nov 2021 at 02:35, Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
> Hi John,
>
>
>
> Going with processing time is perfectly sound if the results meet your
> requirements and you can easily live with events misplaced into the wrong
> time window.
>
> This is also quite a bit cheaper resource-wise.
>
> However you might want to keep in mind situations when things break down
> (network interrupt, datacenter flooded etc. ). With processing time
> events count into the time window when processed, with event time they
> count into the time window when originally created a the source … even if
> processed much later …
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* John Smith 
> *Sent:* Freitag, 26. November 2021 02:55
> *To:* Schwalbe Matthias 
> *Cc:* Caizhi Weng ; user 
> *Subject:* Re: Windows and data loss.
>
>
>
> Well what I'm thinking for 100% accuracy no data loss just to base the
> count on processing time. So whatever arrives in that window is counted. If
> I get some events of the "current" window late and they go into another
> window it's ok.
>
> My pipeline is like so
>
> browser(user)->REST API-->log file-->Filebeat-->Kafka (18
> partitions)->flink->destination
>
> Filebeat inserts into Kafka it's kindof a big bucket of "logs" which I use
> flink to filter the specific app and do the counts. The logs are round
> robin into the topic/partitions. Where I FORSEE a delay is Filebeat can't
> push fast enough into Kafka AND/OR the flink consumer has not read all
> events for that window from all partitions.
>
>
>
> On Thu, 25 Nov 2021 at 11:28, Schwalbe

Re: Windows and data loss.

2021-11-26 Thread John Smith
But if we use event time, if a failure happens potentially those events
can't be delivered in their windo they will be dropped if they come after
the lateness and watermark settings no?


On Fri, 26 Nov 2021 at 02:35, Schwalbe Matthias 
wrote:

> Hi John,
>
>
>
> Going with processing time is perfectly sound if the results meet your
> requirements and you can easily live with events misplaced into the wrong
> time window.
>
> This is also quite a bit cheaper resource-wise.
>
> However you might want to keep in mind situations when things break down
> (network interrupt, datacenter flooded etc. ). With processing time
> events count into the time window when processed, with event time they
> count into the time window when originally created a the source … even if
> processed much later …
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* John Smith 
> *Sent:* Freitag, 26. November 2021 02:55
> *To:* Schwalbe Matthias 
> *Cc:* Caizhi Weng ; user 
> *Subject:* Re: Windows and data loss.
>
>
>
> Well what I'm thinking for 100% accuracy no data loss just to base the
> count on processing time. So whatever arrives in that window is counted. If
> I get some events of the "current" window late and they go into another
> window it's ok.
>
> My pipeline is like so
>
> browser(user)->REST API-->log file-->Filebeat-->Kafka (18
> partitions)->flink->destination
>
> Filebeat inserts into Kafka it's kindof a big bucket of "logs" which I use
> flink to filter the specific app and do the counts. The logs are round
> robin into the topic/partitions. Where I FORSEE a delay is Filebeat can't
> push fast enough into Kafka AND/OR the flink consumer has not read all
> events for that window from all partitions.
>
>
>
> On Thu, 25 Nov 2021 at 11:28, Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
> Hi John,
>
>
>
> … just a short hint:
>
> With datastream API you can
>
>- hand-craft a trigger that decides when an how often emit
>intermediate, punctual and late window results, and when to evict the
>window and stop processing late events
>- in order to process late event you also need to specify for how long
>you will extend the window processing (or is that done in the trigger … I
>    don’t remember right know)
>- overall window state grows, if you extend window processing to after
>it is finished …
>
>
>
> Hope this helps 
>
>
>
> Thias
>
>
>
> *From:* Caizhi Weng 
> *Sent:* Donnerstag, 25. November 2021 02:56
> *To:* John Smith 
> *Cc:* user 
> *Subject:* Re: Windows and data loss.
>
>
>
> Hi!
>
>
>
> Are you using the datastream API or the table / SQL API? I don't know if
> datastream API has this functionality, but in table / SQL API we have the
> following configurations [1].
>
>- table.exec.emit.late-fire.enabled: Emit window results for late
>records;
>- table.exec.emit.late-fire.delay: How often shall we emit results for
>late records (for example, once per 10 minutes or for every record).
>
>
>
> [1]
> https://github.com/apache/flink/blob/601ef3b3bce040264daa3aedcb9d98ead8303485/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala#L214
>
>
>
> John Smith  于2021年11月25日周四 上午12:45写道:
>
> Hi I understand that when using windows and having set the watermarks and
> lateness configs. That if an event comes late it is lost and we can
> output it to side output.
>
> But wondering is there a way to do it without the loss?
>
> I'm guessing an "all" window with a custom trigger that just fires X
> period and whatever is on that bucket is in that bucket?
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return 

Re: Windows and data loss.

2021-11-25 Thread John Smith
Well what I'm thinking for 100% accuracy no data loss just to base the
count on processing time. So whatever arrives in that window is counted. If
I get some events of the "current" window late and they go into another
window it's ok.

My pipeline is like so

browser(user)->REST API-->log file-->Filebeat-->Kafka (18
partitions)->flink->destination

Filebeat inserts into Kafka it's kindof a big bucket of "logs" which I use
flink to filter the specific app and do the counts. The logs are round
robin into the topic/partitions. Where I FORSEE a delay is Filebeat can't
push fast enough into Kafka AND/OR the flink consumer has not read all
events for that window from all partitions.

On Thu, 25 Nov 2021 at 11:28, Schwalbe Matthias 
wrote:

> Hi John,
>
>
>
> … just a short hint:
>
> With datastream API you can
>
>- hand-craft a trigger that decides when an how often emit
>intermediate, punctual and late window results, and when to evict the
>window and stop processing late events
>- in order to process late event you also need to specify for how long
>you will extend the window processing (or is that done in the trigger … I
>don’t remember right know)
>- overall window state grows, if you extend window processing to after
>it is finished …
>
>
>
> Hope this helps 
>
>
>
> Thias
>
>
>
> *From:* Caizhi Weng 
> *Sent:* Donnerstag, 25. November 2021 02:56
> *To:* John Smith 
> *Cc:* user 
> *Subject:* Re: Windows and data loss.
>
>
>
> Hi!
>
>
>
> Are you using the datastream API or the table / SQL API? I don't know if
> datastream API has this functionality, but in table / SQL API we have the
> following configurations [1].
>
>- table.exec.emit.late-fire.enabled: Emit window results for late
>records;
>- table.exec.emit.late-fire.delay: How often shall we emit results for
>late records (for example, once per 10 minutes or for every record).
>
>
>
> [1]
> https://github.com/apache/flink/blob/601ef3b3bce040264daa3aedcb9d98ead8303485/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala#L214
>
>
>
> John Smith  于2021年11月25日周四 上午12:45写道:
>
> Hi I understand that when using windows and having set the watermarks and
> lateness configs. That if an event comes late it is lost and we can
> output it to side output.
>
> But wondering is there a way to do it without the loss?
>
> I'm guessing an "all" window with a custom trigger that just fires X
> period and whatever is on that bucket is in that bucket?
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Re: Windows and data loss.

2021-11-25 Thread John Smith
Thanks. Using, data streaming.

On Wed, 24 Nov 2021 at 20:56, Caizhi Weng  wrote:

> Hi!
>
> Are you using the datastream API or the table / SQL API? I don't know if
> datastream API has this functionality, but in table / SQL API we have the
> following configurations [1].
>
>- table.exec.emit.late-fire.enabled: Emit window results for late
>records;
>- table.exec.emit.late-fire.delay: How often shall we emit results for
>late records (for example, once per 10 minutes or for every record).
>
>
> [1]
> https://github.com/apache/flink/blob/601ef3b3bce040264daa3aedcb9d98ead8303485/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala#L214
>
> John Smith  于2021年11月25日周四 上午12:45写道:
>
>> Hi I understand that when using windows and having set the watermarks and
>> lateness configs. That if an event comes late it is lost and we can
>> output it to side output.
>>
>> But wondering is there a way to do it without the loss?
>>
>> I'm guessing an "all" window with a custom trigger that just fires X
>> period and whatever is on that bucket is in that bucket?
>>
>


Windows and data loss.

2021-11-24 Thread John Smith
Hi I understand that when using windows and having set the watermarks and
lateness configs. That if an event comes late it is lost and we can
output it to side output.

But wondering is there a way to do it without the loss?

I'm guessing an "all" window with a custom trigger that just fires X period
and whatever is on that bucket is in that bucket?


Re: Recommended metaspace memory config for 16GB hosts.

2021-11-23 Thread John Smith
Well the hosts have 16GB.

If there is a "bug" with classloading... Then for now I can only hope to
increase the metaspace size so...

If the host has 16GB

Can I set the Java heap to say 12GB and the Metaspace to 2GB and leave 2GB
for the OS?
Or maybe 10GB for heap and 2GB for Meta which leaves 4GB for everything
else including the OS?

This is from my live taskmanager

taskmanager.memory.flink.size: 10240m
taskmanager.memory.jvm-metaspace.size: 1024m
taskmanager.numberOfTaskSlots: 12

Physical Memory:15.7 GB
JVM Heap Size:4.88 GB
Flink Managed Memory:4.00 GB

JVM (Heap/Non-Heap)
Type
Committed
Used
Maximum
Heap 4.88 GB 2.16 GB 4.88 GB
Non-Heap 416 MB 404 MB 2.23 GB
Total 5.28 GB 2.55 GB 7.10 GB
Outside JVM
Type
Count
Used
Capacity
Direct 32,836 1.01 GB 1.01 GB
Mapped 0 0 B 0 B



On Tue, 23 Nov 2021 at 02:23, Matthias Pohl  wrote:

> In general, running out of memory in the Metaspace pool indicates some bug
> related to the classloaders. Have you considered upgrading to new versions
> of Flink and other parts of your pipeline? Otherwise, you might want to
> create a heap dump and analyze that one [1]. This analysis might reveal
> some pointers to what is causing the problem.
>
> Matthias
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/application_profiling/#analyzing-out-of-memory-problems
>
> On Mon, Nov 22, 2021 at 8:34 PM John Smith  wrote:
>
>> Hi thanks. I know, I already mentioned that I put 1024, see config above.
>> But my question is how much? I still get the message once a while. It also
>> seems that if a job restarts a few times it happens... My jobs aren't
>> complicated. They use Kafka, some of them JDBC and the JDBC driver to push
>> to DB. Right now I use flink for ETL
>>
>> Kafka -> JSon Validation (Jackson) -> filter -> JDBC to database.
>>
>> On Mon, 22 Nov 2021 at 10:24, Matthias Pohl 
>> wrote:
>>
>>> Hi John,
>>> have you had a look at the memory model for Flink 1.10? [1] Based on the
>>> documentation, you could try increasing the Metaspace size independently of
>>> the Flink memory usage (i.e. flink.size). The heap Size is a part of the
>>> overall Flink memory. I hope that helps.
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.10/ops/memory/mem_detail.html
>>>
>>> On Mon, Nov 22, 2021 at 3:58 PM John Smith 
>>> wrote:
>>>
>>>> Hi, has anyone seen this?
>>>>
>>>> On Tue, 16 Nov 2021 at 14:14, John Smith 
>>>> wrote:
>>>>
>>>>> Hi running Flink 1.10
>>>>>
>>>>> I have
>>>>> - 3 job nodes 8GB memory total
>>>>> - jobmanager.heap.size: 6144m
>>>>>
>>>>> - 3 task nodes 16GB memory total
>>>>> - taskmanager.memory.flink.size: 10240m
>>>>> - taskmanager.memory.jvm-metaspace.size: 1024m <--- This still
>>>>> cause metaspace errors once a while, can I go higher do I need to lower 
>>>>> the
>>>>> 10GB above?
>>>>>
>>>>> The task nodes on the UI are reporting:
>>>>> - Physical Memory:15.7 GBJVM
>>>>> - Heap Size:4.88 GB <--- I'm guess this current used heap size and
>>>>> not the mak of 10GB set above?
>>>>> - Flink Managed Memory:4.00 GB
>>>>>
>>>>


Re: Recommended metaspace memory config for 16GB hosts.

2021-11-22 Thread John Smith
Hi thanks. I know, I already mentioned that I put 1024, see config above.
But my question is how much? I still get the message once a while. It also
seems that if a job restarts a few times it happens... My jobs aren't
complicated. They use Kafka, some of them JDBC and the JDBC driver to push
to DB. Right now I use flink for ETL

Kafka -> JSon Validation (Jackson) -> filter -> JDBC to database.

On Mon, 22 Nov 2021 at 10:24, Matthias Pohl  wrote:

> Hi John,
> have you had a look at the memory model for Flink 1.10? [1] Based on the
> documentation, you could try increasing the Metaspace size independently of
> the Flink memory usage (i.e. flink.size). The heap Size is a part of the
> overall Flink memory. I hope that helps.
>
> Best,
> Matthias
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.10/ops/memory/mem_detail.html
>
> On Mon, Nov 22, 2021 at 3:58 PM John Smith  wrote:
>
>> Hi, has anyone seen this?
>>
>> On Tue, 16 Nov 2021 at 14:14, John Smith  wrote:
>>
>>> Hi running Flink 1.10
>>>
>>> I have
>>> - 3 job nodes 8GB memory total
>>> - jobmanager.heap.size: 6144m
>>>
>>> - 3 task nodes 16GB memory total
>>> - taskmanager.memory.flink.size: 10240m
>>> - taskmanager.memory.jvm-metaspace.size: 1024m <--- This still cause
>>> metaspace errors once a while, can I go higher do I need to lower the 10GB
>>> above?
>>>
>>> The task nodes on the UI are reporting:
>>> - Physical Memory:15.7 GBJVM
>>> - Heap Size:4.88 GB <--- I'm guess this current used heap size and
>>> not the mak of 10GB set above?
>>> - Flink Managed Memory:4.00 GB
>>>
>>


Re: Recommended metaspace memory config for 16GB hosts.

2021-11-22 Thread John Smith
Hi, has anyone seen this?

On Tue, 16 Nov 2021 at 14:14, John Smith  wrote:

> Hi running Flink 1.10
>
> I have
> - 3 job nodes 8GB memory total
> - jobmanager.heap.size: 6144m
>
> - 3 task nodes 16GB memory total
> - taskmanager.memory.flink.size: 10240m
> - taskmanager.memory.jvm-metaspace.size: 1024m <--- This still cause
> metaspace errors once a while, can I go higher do I need to lower the 10GB
> above?
>
> The task nodes on the UI are reporting:
> - Physical Memory:15.7 GBJVM
> - Heap Size:4.88 GB <--- I'm guess this current used heap size and not
> the mak of 10GB set above?
> - Flink Managed Memory:4.00 GB
>
>
>


Recommended metaspace memory config for 16GB hosts.

2021-11-16 Thread John Smith
Hi running Flink 1.10

I have
- 3 job nodes 8GB memory total
- jobmanager.heap.size: 6144m

- 3 task nodes 16GB memory total
- taskmanager.memory.flink.size: 10240m
- taskmanager.memory.jvm-metaspace.size: 1024m <--- This still cause
metaspace errors once a while, can I go higher do I need to lower the 10GB
above?

The task nodes on the UI are reporting:
- Physical Memory:15.7 GBJVM
- Heap Size:4.88 GB <--- I'm guess this current used heap size and not
the mak of 10GB set above?
- Flink Managed Memory:4.00 GB


Re: What is Could not retrieve file from transient blob store?

2021-11-04 Thread John Smith
(AbstractTaskManagerFileHandler.java:135)
at
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: Local file
/tmp/blobStore-9cb73f27-11db-4c42-a3fc-9b77f558e722/no_job/blob_t-274d3c2d5acd78ced877d898b1877b10b62a64df-590b54325d599a6782a77413691e0a7b
does not exist and failed to copy from blob store.
at
org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:516)
at
org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:444)
at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:369)
at
org.apache.flink.runtime.rest.handler.taskmanager.AbstractTaskManagerFileHandler.lambda$respondToRequest$0(AbstractTaskManagerFileHandler.java:133)
... 9 more
2021-11-02 23:47:57,865 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [xxjob-0001/xx.72:37007]
failed with java.io.IOException: Connection reset by peer
2021-11-02 23:47:57,912 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink@xxjob-0001:37007] has failed, address is now gated
for [50] ms. Reason: [Disassociated]
2021-11-02 23:53:41,565 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [xxjob-0001/xx.72:42961]
failed with java.io.IOException: Connection reset by peer
2021-11-02 23:53:41,571 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink-metrics@xxjob-0001:42961] has failed, address is now
gated for [50] ms. Reason: [Disassociated]

On Thu, 4 Nov 2021 at 03:45, Guowei Ma  wrote:

> >>>Ok I missed the log below. I guess when the task manager was stopped
> this happened.
> I think if the TM stopped you also would not get the log. But It will
> throw another "UnknownTaskExecutorException", which would include something
> like “No TaskExecutor registered under ”.
>
> >>> But I guess it's ok and not a big issue???
> Does this happen continuously?
>
> Best,
> Guowei
>
>
> On Thu, Nov 4, 2021 at 12:39 AM John Smith  wrote:
>
>> Ok I missed the log below. I guess when the task manager was stopped this
>> happened.
>>
>> I attached the full sequence. But I guess it's ok and not a big issue???
>>
>>
>> 2021-11-02 23:20:22,682 ERROR
>> org.apache.flink.runtime.rest.handler.taskmanager.
>> TaskManagerLogFileHandler - Failed to transfer file from TaskExecutor 7e1
>> b7db5918004e4160fdecec1bbdad7.
>> java.util.concurrent.CompletionException: org.apache.flink.util.
>> FlinkException: Could not retrieve file from transient blob store.
>> at org.apache.flink.runtime.rest.handler.taskmanager.
>> AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
>> AbstractTaskManagerFileHandler.java:135)
>> at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
>> .java:670)
>> at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
>> CompletableFuture.java:646)
>> at java.util.concurrent.CompletableFuture$Completion.run(
>> CompletableFuture.java:456)
>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
>> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>> .run(NioEventLoop.java:515)
>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>> at org.apache.flink.shaded.netty4.io.netty.util.internal.
>> ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not retrieve file
>> from transient blob store

Re: What is Could not retrieve file from transient blob store?

2021-11-03 Thread John Smith
14/docs/deployment/config/#blob-storage-directory
>
>
> Best,
> Guowei
>
>
> On Wed, Nov 3, 2021 at 7:50 AM John Smith  wrote:
>
>> Hi running Flink 1.10.0 With 3 zookeepers, 3 job nodes and 3 task nodes.
>> and I saw this exception on the job node logs...
>> 2021-11-02 23:20:22,703 ERROR
>> org.apache.flink.runtime.rest.handler.taskmanager.
>> TaskManagerLogFileHandler - Unhandled exception.
>> org.apache.flink.util.FlinkException: Could not retrieve file from
>> transient blob store.
>> at org.apache.flink.runtime.rest.handler.taskmanager.
>> AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
>> AbstractTaskManagerFileHandler.java:135)
>> at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
>> .java:670)
>> at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
>> CompletableFuture.java:646)
>> at java.util.concurrent.CompletableFuture$Completion.run(
>> CompletableFuture.java:456)
>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
>> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>> .run(NioEventLoop.java:515)
>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>> at org.apache.flink.shaded.netty4.io.netty.util.internal.
>> ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.FileNotFoundException: Local file /tmp/blobStore-9
>> cb73f27-11db-4c42-a3fc-9b77f558e722/no_job/blob_t-274d3
>> c2d5acd78ced877d898b1877b10b62a64df-590b54325d599a6782a77413691e0a7b
>> does not exist and failed to copy from blob store.
>> at org.apache.flink.runtime.blob.BlobServer.getFileInternal(
>> BlobServer.java:516)
>> at org.apache.flink.runtime.blob.BlobServer.getFileInternal(
>> BlobServer.java:444)
>> at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:
>> 369)
>> at org.apache.flink.runtime.rest.handler.taskmanager.
>> AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
>> AbstractTaskManagerFileHandler.java:133)
>> ... 9 more
>>
>


What is Could not retrieve file from transient blob store?

2021-11-02 Thread John Smith
Hi running Flink 1.10.0 With 3 zookeepers, 3 job nodes and 3 task nodes.
and I saw this exception on the job node logs...
2021-11-02 23:20:22,703 ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
- Unhandled exception.
org.apache.flink.util.FlinkException: Could not retrieve file from transient
blob store.
at org.apache.flink.runtime.rest.handler.taskmanager.
AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
AbstractTaskManagerFileHandler.java:135)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
.java:670)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
CompletableFuture.java:646)
at java.util.concurrent.CompletableFuture$Completion.run(
CompletableFuture.java:456)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(
NioEventLoop.java:515)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at org.apache.flink.shaded.netty4.io.netty.util.internal.
ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: Local file /tmp/blobStore-9
cb73f27-11db-4c42-a3fc-9b77f558e722/no_job/blob_t-274d3
c2d5acd78ced877d898b1877b10b62a64df-590b54325d599a6782a77413691e0a7b does
not exist and failed to copy from blob store.
at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
.java:516)
at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
.java:444)
at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:369)
at org.apache.flink.runtime.rest.handler.taskmanager.
AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
AbstractTaskManagerFileHandler.java:133)
... 9 more


Stream join with (changing) dimension in Kafka

2021-09-22 Thread John Smith
Hi,

I'm trying to use temporal join in Table API to enrich a stream of pageview
events with a slowly changing dimension of user information.
The pageview events are in a kafka topic called *pageviews* and the user
information are in a kafka topic keyed by *userid* and whenever there is an
updated user event, it is appended to the *users* topic.
I declare a table on the pageview topic with watermark strategy of
*WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))* and a
table on the users topic with watermark strategy of
* WatermarkStrategy.forMonotonousTimestamps().*

Here is the code for the temporal join:

Table pv = getPageview(env, tableEnv, properties).
select(
$("timestamp").as("pv_ts"),
$("userid").as("pv_userid"),
$("pageid").as("pv_pageid")
);
Table usr = getUsers(env, tableEnv, properties)
.select(
$("timestamp").as("u_ts"),
$("userid").as("u_userid"),
$("regionid"),
$("gender")
);

TemporalTableFunction userFunction =
usr.createTemporalTableFunction($("u_ts"), $("u_userid"));
tableEnv.createTemporaryFunction("usrFun", userFunction);

Table enrichedPV = pv.joinLateral(call("usrFun", $("pv_ts")),
$("pv_userid").isEqual($("u_userid")));

enrichedPV.execute().print();

When I run this, I get result like the following which only is triggered
when there are new messages pushed into both pageviews and users topics:

++-+++-++++
| op |   pv_ts |  pv_userid |
   pv_pageid |u_ts |
u_userid |   regionid | gender |
++-+++-++++
| +I | 2021-09-22 08:28:05.346 | User_8 |
 Page_99 | 2021-09-22 08:28:04.769 |
User_8 |   Region_1 |  OTHER |
| +I | 2021-09-22 08:28:12.377 | User_3 |
 Page_88 | 2021-09-22 08:28:08.823 |
User_3 |   Region_8 | FEMALE |
| +I | 2021-09-22 08:28:15.385 | User_7 |
 Page_73 | 2021-09-22 08:28:07.817 |
User_7 |   Region_9 |  OTHER |
| +I | 2021-09-22 08:28:16.391 | User_7 |
 Page_97 | 2021-09-22 08:28:07.817 |
User_7 |   Region_9 |  OTHER |
| +I | 2021-09-22 08:28:17.396 | User_7 |
 Page_43 | 2021-09-22 08:28:07.817 |
User_7 |   Region_9 |  OTHER |
| +I | 2021-09-22 08:28:18.400 | User_6 |
 Page_43 | 2021-09-22 08:28:15.854 |
User_6 |   Region_5 |  OTHER |

However, I want to trigger a result whenever a new pageview message arrives
and not wait on the user side.
Do I have any obvious mistake in my code that I cannot get this behavior?
Also is there any code example that I can try where the main stream is
enriched when there is a new event regardless of having any new event in
the dimension side? Flink documentation on temporal join especially for
TableAPI is really thin!

Thanks in advance.


Re: Triggers for windowed aggregations in Table API

2021-09-03 Thread John Smith
Thanks Guowei and Caizhi.
As Guowei noted, I am using Table API and it seems that it does not support
triggers at the moment. Is there a plan to support custom triggers in Table
API/SQL too?
Also, if I follow Guowei's suggestion, should I use DataStream for other
parts of the aggregate computation too or is there a way to create a
GroupedWindowedTable from the DataStream?

Thanks,

On Thu, Sep 2, 2021 at 9:24 PM Guowei Ma  wrote:

> Hi, John
>
> I agree with Caizhi that you might need to customize a window trigger. But
> there is a small addition, you need to convert Table to DataStream first.
> Then you can customize the trigger of the window. Because as far as I
> know, Table API does not support custom windows yet. For details on how to
> convert, you can refer to [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#datastream-api-integration
> Best,
> Guowei
>
>
> On Fri, Sep 3, 2021 at 10:28 AM Caizhi Weng  wrote:
>
>> Hi!
>>
>> You might want to use your custom trigger to achieve this.
>>
>> Tumble windows are using EventTimeTrigger by default. Flink has another
>> built-in trigger called CountTrigger but it only fires for every X records,
>> ignoring the event time completely. You might want to create your own
>> trigger to combine the two, or more specifically, combine
>> EventTimeTrigger#onEventTime and CountTrigger#onElement.
>>
>> For more about custom triggers see
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers
>>
>> John Smith  于2021年9月3日周五 上午2:00写道:
>>
>>> Hi,
>>>
>>> Sorry if this has been answered previously but I couldn't find any
>>> answer for the question and would appreciate any help.
>>> Context:
>>> Let's say I have a log stream in Kafka where message values have an *id*
>>> field along with a few other fields. I want to count the number of messages
>>> for each id for a tumbling window of* ten minutes *and if the count for
>>> any id in the given window is higher than 5, I want to write the message
>>> into the sink topic. However, I don't want to wait until the end of the 10
>>> minute window to emit the result and want to immediately emit the result
>>> when the count is more than 5 for an id in the window. For example, if I
>>> see 6 messages in the first minute for an id, I want to trigger a write
>>> with the count of 6 in the sink topic immediately and not wait the whole 10
>>> minutes.
>>> The following code does the aggregation but emits the results at the end
>>> of the window. How can I trigger the emitting result earlier?
>>>
>>> final Table resultTable = sourceTable
>>> .select( $("id")
>>> , $("key")
>>> 
>>> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
>>> .groupBy($("w"), $("id"))
>>> .select($("w").start().as("WindowStart"), $("id"), 
>>> $("key").count().as("count"))
>>> ;
>>>
>>> resultTable.execute().print();
>>>
>>>
>>> Thanks in advance!
>>>
>>>


Triggers for windowed aggregations in Table API

2021-09-02 Thread John Smith
Hi,

Sorry if this has been answered previously but I couldn't find any answer
for the question and would appreciate any help.
Context:
Let's say I have a log stream in Kafka where message values have an *id*
field along with a few other fields. I want to count the number of messages
for each id for a tumbling window of* ten minutes *and if the count for any
id in the given window is higher than 5, I want to write the message into
the sink topic. However, I don't want to wait until the end of the 10
minute window to emit the result and want to immediately emit the result
when the count is more than 5 for an id in the window. For example, if I
see 6 messages in the first minute for an id, I want to trigger a write
with the count of 6 in the sink topic immediately and not wait the whole 10
minutes.
The following code does the aggregation but emits the results at the end of
the window. How can I trigger the emitting result earlier?

final Table resultTable = sourceTable
.select( $("id")
, $("key")

.window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
.groupBy($("w"), $("id"))
.select($("w").start().as("WindowStart"), $("id"),
$("key").count().as("count"))
;

resultTable.execute().print();


Thanks in advance!


Re: Convert DataStream to Table with the same columns in Row

2021-05-16 Thread John Smith
Thanks for your help Timo and Fabian,
Got it working with Timo’s suggestion.

On Fri, May 14, 2021 at 6:14 AM Fabian Paul 
wrote:

> Hi John,
>
> Can you maybe share more code about how you build the DataStrean?
> It would also be good to know against which Flink version you are testing.
> I just
> tried the following code against the current master and:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> DataStream rowStream = env.fromElements(Row.of(1, "test1"), Row.of(2, 
> "test2"));
> Table t = tableEnv.fromDataStream(rowStream);
> t.execute().print();
>
> seems to work fine.
> ++-++
> | op |  f0 | f1 |
> ++-++
> | +I |   1 |  test1 |
> | +I |   2 |  test2 |
> ++-+----+
>
> Best,
> Fabian
>
>
> On 14. May 2021, at 09:01, John Smith  wrote:
>
> Hi,
>
> Sorry if this is a duplicate question but I couldn't find any answer to
> my question.
> I am trying to convert a DataStream into a Table where the columns in
> the Row objects in the DataStream will become columns of the Table.
> Here is how I tried to do it:
>
> //Creating a DataStream of Row type. Let's assume the Row type has 3
> columns:
> // (c1 BIGINT, c2 String, c3 String)
> DataStream rowStream = 
>
> // Convert it to a Table
> Table t = tableEnv.fromDataStream(rowStream)
>
> // Print the table
> t.execute().print();
>
> However, when I print the table it has one column of type Row instead of
> three columns (c1, c2, c3).
>
> What I see in the print result is:
>
> +++
> | op |  f0   |
> +++
> | +I | +I{c1=1620968140951, ... |
>
> What I would like to see is:
>
> ++---+
> | op |c1  |   c2   |c3   |
> ++---+
> |   +I |  1620968140951 |  'foo'  |  'bar'  |
>
> How can I convert the DataStream to a table that has the same columns as
> the columns in Row in the DataStream.
> Would really appreciate it if anyone can share a code snippet for the
> above example.
>
> Thanks,
> JS.
>
>
>


Convert DataStream to Table with the same columns in Row

2021-05-14 Thread John Smith
Hi,

Sorry if this is a duplicate question but I couldn't find any answer to
my question.
I am trying to convert a DataStream into a Table where the columns in
the Row objects in the DataStream will become columns of the Table.
Here is how I tried to do it:

//Creating a DataStream of Row type. Let's assume the Row type has 3
columns:
// (c1 BIGINT, c2 String, c3 String)
DataStream rowStream = 

// Convert it to a Table
Table t = tableEnv.fromDataStream(rowStream)

// Print the table
t.execute().print();

However, when I print the table it has one column of type Row instead of
three columns (c1, c2, c3).

What I see in the print result is:

+++
| op |  f0   |
+++
| +I | +I{c1=1620968140951, ... |

What I would like to see is:

++---+
| op |c1  |   c2   |c3   |
++---+
|   +I |  1620968140951 |  'foo'  |  'bar'  |

How can I convert the DataStream to a table that has the same columns as
the columns in Row in the DataStream.
Would really appreciate it if anyone can share a code snippet for the above
example.

Thanks,
JS.


Log rollover for logs.

2021-04-27 Thread John Smith
Hi, I'm running flink as a systemd service with...

[Service]
Type=forking
WorkingDirectory=/opt/flink
User=flink
Group=flink
ExecStart=/opt/flink/bin/taskmanager.sh start
ExecStop=/opt/flink/bin/taskmanager.sh stop
TimeoutSec=30
Restart=on-failure

My log4j.porperties file is at /opt/flink/conf

Is it as simple as just setting the log rollover in there? Cause sometimes
with certain services like zookeeper some env variable overrides etc... So
just wondering if it's that straight forward with flink.


Re: Too man y checkpoint folders kept for externalized retention.

2021-04-25 Thread John Smith
No. But I decided to disable it finally

On Sun., Apr. 25, 2021, 5:14 a.m. Yun Gao,  wrote:

> Hi John,
>
> Logically the maximum retained checkpoints are configured
> by state.checkpoints.num-retained [1]. Have you configured
> this option?
>
>
> Best,
> Yun
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#state-checkpoints-num-retained
>
>
>
> --
> Sender:John Smith
> Date:2021/04/24 01:41:41
> Recipient:user
> Theme:Too man y checkpoint folders kept for externalized retention.
>
> Hi running 1.10.0.
>
> Just curious is this specific to externalized retention or checkpointing
> in general.
>
> I see my checkpoint folder counting thousands of chk-x folders.
>
> If using default checkpoint or NONE externalized checkpointing does the
> count of chk- folders grow indefinitely until the job is killed or it
> retains up to certain amount?
>
> Thanks
>
>


Too man y checkpoint folders kept for externalized retention.

2021-04-23 Thread John Smith
Hi running 1.10.0.

Just curious is this specific to externalized retention or checkpointing in
general.

I see my checkpoint folder counting thousands of chk-x folders.

If using default checkpoint or NONE externalized checkpointing does the
count of chk- folders grow indefinitely until the job is killed or it
retains up to certain amount?

Thanks


CDC for MS SQL Server

2021-02-11 Thread John Smith
I see you have native connectors for Postgress and MySql. I also See a
debezium connector.

1- I'm guessing if we want to uses MS SQL Server CDC we need to use the
Deebezium connector?
2- If so, do we need to have the full debezium service running. Or it's as
simple as the MySql and Postgres connectors?


Re: What does Kafka Error sending fetch request mean for the Kafka source?

2020-11-03 Thread John Smith
Kafka source is configured as AT_LEAST_ONCE and the JDBC sink handles
duplicates with unique key/constraint and logs duplicates in a separate SQL
table. And essentially it gives us EXACTLY_ONCE semantics.

That's not a problem, it works great!

1- I was curious if that specific Kafka message was the cause of the
duplicates, but if I understand correctly Becket it's not the source of the
duplicates and I wanted to confirm that.
2- I started monitoring checkpoints on average they are 100ms, during peak
we started seeing checkpoints takie 20s-40s+... My checkpoint is configed
as follows:
 - env.enableCheckpointing(6);
 -
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
 -
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 - env.getCheckpointConfig().setCheckpointTimeout(6);
 - env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
3- Based on above it's possible that the sink takes longer than 60seconds
sometimes...
- Looking at adjusting timeouts.
- Looking at reducing the load of the sink and reduce how long it takes
in general.

On Tue, 3 Nov 2020 at 10:49, Robert Metzger  wrote:

> How did you configure the Kafka source as at least once? Afaik the source
> is always exactly-once (as long as there aren't any restarts).
>
> Are you seeing the duplicates in the context of restarts of the Flink job?
>
> On Tue, Nov 3, 2020 at 1:54 AM John Smith  wrote:
>
>> Sorry, got confused with your reply... Does the message "Error sending
>> fetch request" cause retries/duplicates down stream or it doesn't?
>>
>> I'm guessing it's even before the source can even send anything
>> downstream...
>>
>>
>> On Sat, 31 Oct 2020 at 09:10, John Smith  wrote:
>>
>>> Hi my flow is Kafka Source -> Transform -> JDBC Sink
>>>
>>> Kafka Source is configured as at least once and JDBC prevents duplicates
>>> with unique key constraint and duplicate is logged in separate table. So
>>> the destination data is exactly once.
>>>
>>> The duplicates happen every so often, looking at check point history
>>> there was some checkpoints that failed, but the history isn't long enough
>>> to go back and look. I'm guessing I will have to adjust the checkpointing
>>> times a bit...
>>>
>>> On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, 
>>> wrote:
>>>
>>>> Hi John,
>>>>
>>>> The log message you saw from Kafka consumer simply means the consumer
>>>> was disconnected from the broker that FetchRequest was supposed to be sent
>>>> to. The disconnection can happen in many cases, such as broker down,
>>>> network glitches, etc. The KafkaConsumer will just reconnect and retry
>>>> sending that FetchRequest again. This won't cause duplicate messages in
>>>> KafkaConsumer or Flink. Have you enabled exactly-once semantic for your
>>>> Kafka sink? If not, the downstream might see duplicates in case of Flink
>>>> failover or occasional retry in the KafkaProducer of the Kafka sink.
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Thu, Oct 22, 2020 at 11:38 PM John Smith 
>>>> wrote:
>>>>
>>>>> Any thoughts this doesn't seem to create duplicates all the time or
>>>>> maybe it's unrelated as we are still seeing the message and there is no
>>>>> duplicates...
>>>>>
>>>>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, 
>>>>> wrote:
>>>>>
>>>>>> And yes my downstream is handling the duplicates in an idempotent way
>>>>>> so we are good on that point. But just curious what the behaviour is on 
>>>>>> the
>>>>>> source consumer when that error happens.
>>>>>>
>>>>>> On Wed, 21 Oct 2020 at 12:04, John Smith 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-
>>>>>>> 10-21 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler
>>>>>>> - [Consumer clientId=consumer-2, groupId=xx-import] Error
>>>>>>> sending fetch request (sessionId=806089934, epoch=INITIAL) to node 0:
>>>>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>>>>
>>>>>>> Obviously it looks like the consumer is getting disconnected and
>>>>>>> from what it seems it's either a Kafka bug on the way it handles the 
>>>>>>> EPOCH
>>>>>>> or possibly version mismatch between client and brokers. That's fine I 
>>>>>>> can
>>>>>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>>>>>> what happens in terms of the source and the sink. It looks let we get
>>>>>>> duplicates on the sink and I'm guessing it's because the consumer is
>>>>>>> failing and at that point Flink stays on that checkpoint until it can
>>>>>>> reconnect and process that offset and hence the duplicates downstream?
>>>>>>>
>>>>>>


Re: What does Kafka Error sending fetch request mean for the Kafka source?

2020-11-02 Thread John Smith
Sorry, got confused with your reply... Does the message "Error sending
fetch request" cause retries/duplicates down stream or it doesn't?

I'm guessing it's even before the source can even send anything
downstream...


On Sat, 31 Oct 2020 at 09:10, John Smith  wrote:

> Hi my flow is Kafka Source -> Transform -> JDBC Sink
>
> Kafka Source is configured as at least once and JDBC prevents duplicates
> with unique key constraint and duplicate is logged in separate table. So
> the destination data is exactly once.
>
> The duplicates happen every so often, looking at check point history there
> was some checkpoints that failed, but the history isn't long enough to go
> back and look. I'm guessing I will have to adjust the checkpointing times a
> bit...
>
> On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, 
> wrote:
>
>> Hi John,
>>
>> The log message you saw from Kafka consumer simply means the consumer was
>> disconnected from the broker that FetchRequest was supposed to be sent to.
>> The disconnection can happen in many cases, such as broker down, network
>> glitches, etc. The KafkaConsumer will just reconnect and retry sending that
>> FetchRequest again. This won't cause duplicate messages in KafkaConsumer or
>> Flink. Have you enabled exactly-once semantic for your Kafka sink? If not,
>> the downstream might see duplicates in case of Flink failover or occasional
>> retry in the KafkaProducer of the Kafka sink.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Thu, Oct 22, 2020 at 11:38 PM John Smith 
>> wrote:
>>
>>> Any thoughts this doesn't seem to create duplicates all the time or
>>> maybe it's unrelated as we are still seeing the message and there is no
>>> duplicates...
>>>
>>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, 
>>> wrote:
>>>
>>>> And yes my downstream is handling the duplicates in an idempotent way
>>>> so we are good on that point. But just curious what the behaviour is on the
>>>> source consumer when that error happens.
>>>>
>>>> On Wed, 21 Oct 2020 at 12:04, John Smith 
>>>> wrote:
>>>>
>>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10-
>>>>> 21 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler - [
>>>>> Consumer clientId=consumer-2, groupId=xx-import] Error sending
>>>>> fetch request (sessionId=806089934, epoch=INITIAL) to node 0:
>>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>>
>>>>> Obviously it looks like the consumer is getting disconnected and from
>>>>> what it seems it's either a Kafka bug on the way it handles the EPOCH or
>>>>> possibly version mismatch between client and brokers. That's fine I can
>>>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>>>> what happens in terms of the source and the sink. It looks let we get
>>>>> duplicates on the sink and I'm guessing it's because the consumer is
>>>>> failing and at that point Flink stays on that checkpoint until it can
>>>>> reconnect and process that offset and hence the duplicates downstream?
>>>>>
>>>>


  1   2   >