[jira] [Created] (FLINK-31357) A record is deleted before being inserted, it will be deleted

2023-03-07 Thread Fangliang Liu (Jira)
Fangliang Liu created FLINK-31357:
-

 Summary: A record is deleted before being inserted, it will be 
deleted
 Key: FLINK-31357
 URL: https://issues.apache.org/jira/browse/FLINK-31357
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.14.3
Reporter: Fangliang Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31354) NettyClientServerSslTest.testValidSslConnectionAdvanced timed out

2023-03-07 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-31354:
--
Description: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46883=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8242

{code}
Test testValidSslConnectionAdvanced[SSL provider = 
JDK](org.apache.flink.runtime.io.network.netty.NettyClientServerSslTest) is 
running.

05:15:10,904 [main] INFO  
org.apache.flink.runtime.io.network.netty.NettyConfig[] - NettyConfig 
[server address: localhost/127.0.0.1, server port: 42935, ssl enabled: true, 
memory segment size (bytes): 1024, transport type: AUTO, number of server 
threads: 1 (manual), number of client threads>
05:15:10,916 [main] INFO  
org.apache.flink.runtime.io.network.netty.NettyServer[] - Transport 
type 'auto': using EPOLL.
05:15:12,149 [main] INFO  
org.apache.flink.runtime.io.network.netty.NettyServer[] - Successful 
initialization (took 1245 ms). Listening on SocketAddress /127.0.0.1:42935.
05:15:12,150 [main] INFO  
org.apache.flink.runtime.io.network.netty.NettyClient[] - Transport 
type 'auto': using EPOLL.
05:15:13,249 [main] INFO  
org.apache.flink.runtime.io.network.netty.NettyClient[] - Successful 
initialization (took 1099 ms).
05:15:14,588 [main] ERROR 
org.apache.flink.runtime.io.network.netty.NettyClientServerSslTest [] - 

Test testValidSslConnectionAdvanced[SSL provider = 
JDK](org.apache.flink.runtime.io.network.netty.NettyClientServerSslTest) failed 
with:
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandshakeTimeoutException:
 handshake timed out after 1000ms
at 
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler$7.run(SslHandler.java:2115)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
{code}

  was:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46883=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8242

{code}
Mar 07 05:15:21 [ERROR] NettyClientServerSslTest.testValidSslConnectionAdvanced 
 Time elapsed: 3.69 s  <<< ERROR!
Mar 07 05:15:21 
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandshakeTimeoutException:
 handshake timed out after 1000ms
Mar 07 05:15:21 at 
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler$7.run(SslHandler.java:2115)
Mar 07 05:15:21 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
Mar 07 05:15:21 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153)
Mar 07 05:15:21 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
Mar 07 05:15:21 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
Mar 07 05:15:21 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
Mar 07 05:15:21 at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
Mar 07 05:15:21 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
Mar 07 05:15:21 at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
Mar 07 05:15:21 at java.lang.Thread.run(Thread.java:748)
{code}


> NettyClientServerSslTest.testValidSslConnectionAdvanced timed out
> 

[jira] [Updated] (FLINK-31356) Serialize garbled characters at checkpoint

2023-03-07 Thread John (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John updated FLINK-31356:
-
Description: 
 
{panel:title=The last checkpoint of the program was successful}
2023-03-07 08:33:16,085 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job 
8b5720a4a40f50b995c97c6fe5b93079.
2023-03-07 08:33:16,918 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in 
849 ms).
2023-03-07 08:33:16,918 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 39126 as completed for source Source: kafkaDataStream.
2023-03-07 08:36:10,444 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to 
FAILED on container_e38_1676011848026_0012_01_02 @  (dataPort=44633).
java.lang.RuntimeException: Writing records to JDBC failed.
    at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153)
 ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6]
{panel}
{panel:title=But from this checkpoint restore, it can't be decoded}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from any of 
the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    ... 10 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    ... 10 more
Caused by: java.io.UTFDataFormatException: malformed input around byte 32
    at java.io.DataInputStream.readUTF(DataInputStream.java:656) ~[?:1.8.0_201]
    at java.io.DataInputStream.readUTF(DataInputStream.java:564) ~[?:1.8.0_201]
    at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:379)
 ~[flink-core-1.13.6.jar:1.13.6]
    at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:155)
 ~[flink-core-1.13.6.jar:1.13.6]
    at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
 ~[flink-core-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 

[jira] [Updated] (FLINK-31356) Serialize garbled characters at checkpoint

2023-03-07 Thread John (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John updated FLINK-31356:
-
Description: 
 
{panel:title=The last checkpoint of the program was successful}
2023-03-07 08:33:16,085 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job 
8b5720a4a40f50b995c97c6fe5b93079.
2023-03-07 08:33:16,918 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in 
849 ms).
2023-03-07 08:33:16,918 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 39126 as completed for source Source: userKafkaDataStream.
2023-03-07 08:36:10,444 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to 
FAILED on container_e38_1676011848026_0012_01_02 @  (dataPort=44633).
java.lang.RuntimeException: Writing records to JDBC failed.
    at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153)
 ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6]
{panel}
{panel:title=But from this checkpoint restore, it can't be decoded}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from any of 
the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    ... 10 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    ... 10 more
Caused by: java.io.UTFDataFormatException: malformed input around byte 32
    at java.io.DataInputStream.readUTF(DataInputStream.java:656) ~[?:1.8.0_201]
    at java.io.DataInputStream.readUTF(DataInputStream.java:564) ~[?:1.8.0_201]
    at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:379)
 ~[flink-core-1.13.6.jar:1.13.6]
    at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:155)
 ~[flink-core-1.13.6.jar:1.13.6]
    at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
 ~[flink-core-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 

[jira] [Created] (FLINK-31356) Serialize garbled characters at checkpoint

2023-03-07 Thread John (Jira)
John created FLINK-31356:


 Summary: Serialize garbled characters at checkpoint
 Key: FLINK-31356
 URL: https://issues.apache.org/jira/browse/FLINK-31356
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.13.6
Reporter: John


 
{panel:title=The last checkpoint of the program was successful}
2023-03-07 08:33:16,085 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job 
8b5720a4a40f50b995c97c6fe5b93079.
2023-03-07 08:33:16,918 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in 
849 ms).
2023-03-07 08:33:16,918 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 39126 as completed for source Source: userKafkaDataStream.
2023-03-07 08:36:10,444 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
UnifiedUser_mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from 
RUNNING to FAILED on container_e38_1676011848026_0012_01_02 @ 
ecs-iovc-prd-flink-0006 (dataPort=44633).
java.lang.RuntimeException: Writing records to JDBC failed.
    at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153)
 ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6]
{panel}
{panel:title=But from this checkpoint restore, it can't be decoded}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from any of 
the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    ... 10 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    ... 10 more
Caused by: java.io.UTFDataFormatException: malformed input around byte 32
    at java.io.DataInputStream.readUTF(DataInputStream.java:656) ~[?:1.8.0_201]
    at java.io.DataInputStream.readUTF(DataInputStream.java:564) ~[?:1.8.0_201]
    at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:379)
 ~[flink-core-1.13.6.jar:1.13.6]
    at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:155)
 ~[flink-core-1.13.6.jar:1.13.6]
    at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
 ~[flink-core-1.13.6.jar:1.13.6]
    at 
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
    at 

[GitHub] [flink] jiangxin369 commented on a diff in pull request #22034: [FLINK-31240][table] Reduce the overhead of conversion between DataStream and Table

2023-03-07 Thread via GitHub


jiangxin369 commented on code in PR #22034:
URL: https://github.com/apache/flink/pull/22034#discussion_r1127524788


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##
@@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws 
Exception {
 Row.of("c", 1000));
 }
 
+@Test
+public void testFromAndToDataStreamBypassWithPojo() throws Exception {
+env.setParallelism(1);
+final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+final List> tuples =
+Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), 
Tuple2.of(3L, "c"));
+
+final DataStream> dataStream =
+env.fromCollection(tuples, Types.TUPLE(Types.LONG, 
Types.STRING));
+
+final Table table = tableEnv.fromDataStream(dataStream);
+
+final DataStream> convertedDataStream =
+tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+
+assertEquals(dataStream, convertedDataStream);
+testResult(convertedDataStream, tuples.toArray(new Tuple2[0]));
+
+final Table tableWithPK =
+tableEnv.fromDataStream(
+dataStream,
+Schema.newBuilder()
+.column("f0", BIGINT().notNull())
+.column("f1", STRING())
+.primaryKey("f0")
+.build());
+final DataStream> convertedDataStreamWithPK =
+tableEnv.toDataStream(tableWithPK, 
DataTypes.of(dataStream.getType()));
+
+assertNotEquals(dataStream, convertedDataStreamWithPK);
+testResult(convertedDataStreamWithPK, tuples.toArray(new Tuple2[0]));
+}
+
+@Test
+public void testFromAndToDataStreamBypassWithRow() throws Exception {
+env.setParallelism(1);
+final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+final SourceFunction rowGenerator =
+new SourceFunction() {
+@Override
+public final void run(SourceContext ctx) throws 
Exception {
+Row row = new Row(2);
+row.setField(0, 1L);
+row.setField(1, "a");
+ctx.collect(row);
+}
+
+@Override
+public void cancel() {}
+};
+
+final RowTypeInfo typeInfo =
+new RowTypeInfo(new TypeInformation[] {Types.LONG, 
Types.STRING});
+
+// test datastream of rows with non-default name
+DataStream dataStream = env.addSource(rowGenerator, typeInfo);
+
+Table table = tableEnv.fromDataStream(dataStream);
+DataStream convertedDataStream =
+tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+
+assertEquals(dataStream, convertedDataStream);
+
+// access rows by default name
+DataStream transformedDataStream =
+convertedDataStream.map(

Review Comment:
   This transformation is to make sure that the returned datastream can be 
accessed by default name(f0, f1...).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jiangxin369 commented on a diff in pull request #22034: [FLINK-31240][table] Reduce the overhead of conversion between DataStream and Table

2023-03-07 Thread via GitHub


jiangxin369 commented on code in PR #22034:
URL: https://github.com/apache/flink/pull/22034#discussion_r1127523251


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##
@@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws 
Exception {
 Row.of("c", 1000));
 }
 
+@Test
+public void testFromAndToDataStreamBypassWithPojo() throws Exception {
+env.setParallelism(1);
+final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+final List> tuples =
+Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), 
Tuple2.of(3L, "c"));
+
+final DataStream> dataStream =
+env.fromCollection(tuples, Types.TUPLE(Types.LONG, 
Types.STRING));
+
+final Table table = tableEnv.fromDataStream(dataStream);
+
+final DataStream> convertedDataStream =
+tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType()));
+
+assertEquals(dataStream, convertedDataStream);
+testResult(convertedDataStream, tuples.toArray(new Tuple2[0]));
+
+final Table tableWithPK =
+tableEnv.fromDataStream(
+dataStream,
+Schema.newBuilder()
+.column("f0", BIGINT().notNull())
+.column("f1", STRING())
+.primaryKey("f0")
+.build());
+final DataStream> convertedDataStreamWithPK =
+tableEnv.toDataStream(tableWithPK, 
DataTypes.of(dataStream.getType()));
+
+assertNotEquals(dataStream, convertedDataStreamWithPK);
+testResult(convertedDataStreamWithPK, tuples.toArray(new Tuple2[0]));
+}
+
+@Test
+public void testFromAndToDataStreamBypassWithRow() throws Exception {
+env.setParallelism(1);
+final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+
+final SourceFunction rowGenerator =
+new SourceFunction() {
+@Override
+public final void run(SourceContext ctx) throws 
Exception {
+Row row = new Row(2);
+row.setField(0, 1L);
+row.setField(1, "a");
+ctx.collect(row);
+}
+
+@Override
+public void cancel() {}
+};
+
+final RowTypeInfo typeInfo =
+new RowTypeInfo(new TypeInformation[] {Types.LONG, 
Types.STRING});
+
+// test datastream of rows with non-default name
+DataStream dataStream = env.addSource(rowGenerator, typeInfo);

Review Comment:
   `env.fromCollection(...)` generates row data with RowSerializer and it would 
construct the row with field name so that the rows of datastream can be 
accessed by non-default name anyway. But we want to test the case that the code 
change works even if the datastream is composed of positioned rows without 
field names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on pull request #21636: [FLINK-30113] Implement state compression for broadcast and regular operator states

2023-03-07 Thread via GitHub


echauchot commented on PR #21636:
URL: https://github.com/apache/flink/pull/21636#issuecomment-1457760168

   > What you did for versioning looks sane. I think the problem is with line
   > 
   > 
https://github.com/apache/flink/blob/a4a11a389609deb5d1de139115cedc35b675b366/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java#L68
   > 
   > which has not been implemented in a future proof way. The intent there was 
to throw an exception for `readVersion < 6`, not less than 
`CURRENT_STATE_META_INFO_SNAPSHOT_VERSION`. If we change the condition to 
`readVersion < 6` it should work.
   
   Hi @dawidwys, thanks for the prompt answer. I suspected something not future 
proof, thanks for the confirmation. It is now fixed. Tests pass but the CI 
fails in preparation of the E2E tests (since some days) with a http 404 error.
   
   PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on pull request #22112: [FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0

2023-03-07 Thread via GitHub


TanYuxin-tyx commented on PR #22112:
URL: https://github.com/apache/flink/pull/22112#issuecomment-1457759198

   Now LGTM, thanks @reswqa for fixing the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31341) OutOfMemoryError in Kafka e2e tests

2023-03-07 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-31341:
--
Priority: Critical  (was: Blocker)

> OutOfMemoryError in Kafka e2e tests
> ---
>
> Key: FLINK-31341
> URL: https://issues.apache.org/jira/browse/FLINK-31341
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> We experience a OOM in Kafka e2e tests:
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46820=logs=fb37c667-81b7-5c22-dd91-846535e99a97=39a035c3-c65e-573c-fb66-104c66c28912=11726
> {code}
> ar 06 06:22:30 [ERROR] Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "ForkJoinPool-1-worker-0"
> Exception in thread "ForkJoinPool-1-worker-0" Mar 06 06:27:30 [ERROR] Tests 
> run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1,094.139 s <<< 
> FAILURE! - in JUnit Jupiter
> Mar 06 06:27:30 [ERROR] JUnit Jupiter.JUnit Jupiter  Time elapsed: 947.463 s  
> <<< ERROR!
> Mar 06 06:27:30 org.junit.platform.commons.JUnitException: TestEngine with ID 
> 'junit-jupiter' failed to execute tests
> Mar 06 06:27:30   at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:153)
> Mar 06 06:27:30   at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
> Mar 06 06:27:30   at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
> Mar 06 06:27:30   at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
> Mar 06 06:27:30   at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
> Mar 06 06:27:30   at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
> Mar 06 06:27:30   at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
> Mar 06 06:27:30   at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
> Mar 06 06:27:30   at 
> org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
> Mar 06 06:27:30   at 
> org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
> Mar 06 06:27:30   at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
> Mar 06 06:27:30   at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
> Mar 06 06:27:30   at 
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128)
> Mar 06 06:27:30   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
> Mar 06 06:27:30   at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
> Mar 06 06:27:30   at 
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
> Mar 06 06:27:30   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
> Mar 06 06:27:30 Caused by: org.junit.platform.commons.JUnitException: Error 
> executing tests for engine junit-jupiter
> Mar 06 06:27:30   at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:57)
> Mar 06 06:27:30   at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
> Mar 06 06:27:30   ... 16 more
> Mar 06 06:27:30 Caused by: java.util.concurrent.ExecutionException: 
> java.lang.OutOfMemoryError
> Mar 06 06:27:30   at 
> java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)
> Mar 06 06:27:30   at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
> Mar 06 06:27:30   ... 17 more
> Mar 06 06:27:30 Caused by: java.lang.OutOfMemoryError
> Mar 06 06:27:30   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Mar 06 06:27:30   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Mar 06 06:27:30   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Mar 06 06:27:30   at 
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> Mar 06 06:27:30   at 
> 

[GitHub] [flink] reswqa commented on pull request #22112: [FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0

2023-03-07 Thread via GitHub


reswqa commented on PR #22112:
URL: https://github.com/apache/flink/pull/22112#issuecomment-1457747179

   Thanks @TanYuxin-tyx for the review, I have updated this according to your 
comments, please take a look again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31342) SQLClientSchemaRegistryITCase timed out when starting the test container

2023-03-07 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-31342:
--
Priority: Critical  (was: Blocker)

> SQLClientSchemaRegistryITCase timed out when starting the test container
> 
>
> Key: FLINK-31342
> URL: https://issues.apache.org/jira/browse/FLINK-31342
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46820=logs=fb37c667-81b7-5c22-dd91-846535e99a97=39a035c3-c65e-573c-fb66-104c66c28912=11767
> {code}
> Mar 06 06:53:47 [ERROR] Tests run: 2, Failures: 0, Errors: 1, Skipped: 1, 
> Time elapsed: 1,037.927 s <<< FAILURE! - in 
> org.apache.flink.tests.util.kafka.SQLClientSchemaRegistryITCase
> Mar 06 06:53:47 [ERROR] 
> org.apache.flink.tests.util.kafka.SQLClientSchemaRegistryITCase  Time 
> elapsed: 1,037.927 s  <<< ERROR!
> Mar 06 06:53:47 org.junit.runners.model.TestTimedOutException: test timed out 
> after 10 minutes
> Mar 06 06:53:47   at sun.misc.Unsafe.park(Native Method)
> Mar 06 06:53:47   at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> Mar 06 06:53:47   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> Mar 06 06:53:47   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> Mar 06 06:53:47   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> Mar 06 06:53:47   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> Mar 06 06:53:47   at 
> org.testcontainers.containers.GenericContainer.start(GenericContainer.java:323)
> Mar 06 06:53:47   at 
> org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1063)
> Mar 06 06:53:47   at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
> Mar 06 06:53:47   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Mar 06 06:53:47   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Mar 06 06:53:47   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Mar 06 06:53:47   at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on a diff in pull request #22112: [FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22112:
URL: https://github.com/apache/flink/pull/22112#discussion_r1127507881


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java:
##
@@ -436,7 +442,17 @@ private void mayTriggerReading() {
 && numRequestedBuffers + bufferPool.getNumBuffersPerRequest() 
<= maxRequestedBuffers
 && numRequestedBuffers < 
bufferPool.getAverageBuffersPerRequester()) {
 isRunning = true;
-ioExecutor.execute(this);
+ioExecutor.execute(
+() -> {
+try {
+run();
+} catch (Exception e) {

Review Comment:
   Good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31339) PlannerScalaFreeITCase.testImperativeUdaf

2023-03-07 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697292#comment-17697292
 ] 

Matthias Pohl commented on FLINK-31339:
---

[~qafro] may you have a look at it?

> PlannerScalaFreeITCase.testImperativeUdaf
> -
>
> Key: FLINK-31339
> URL: https://issues.apache.org/jira/browse/FLINK-31339
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: test-stability
>
> {{PlannerScalaFreeITCase.testImperativeUdaf}} failed:
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46812=logs=87489130-75dc-54e4-1f45-80c30aa367a3=73da6d75-f30d-5d5a-acbe-487a9dcff678=15012
> {code}
> Mar 05 05:55:50 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 62.028 s <<< FAILURE! - in 
> org.apache.flink.table.sql.codegen.PlannerScalaFreeITCase
> Mar 05 05:55:50 [ERROR] PlannerScalaFreeITCase.testImperativeUdaf  Time 
> elapsed: 40.924 s  <<< FAILURE!
> Mar 05 05:55:50 org.opentest4j.AssertionFailedError: Did not get expected 
> results before timeout, actual result: 
> [{"before":null,"after":{"user_name":"Bob","order_cnt":1},"op":"c"}, 
> {"before":null,"after":{"user_name":"Alice","order_cnt":1},"op":"c"}, 
> {"before":{"user_name":"Bob","order_cnt":1},"after":null,"op":"d"}, 
> {"before":null,"after":{"user_name":"Bob","order_cnt":2},"op":"c"}]. ==> 
> expected:  but was: 
> Mar 05 05:55:50   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> Mar 05 05:55:50   at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> Mar 05 05:55:50   at 
> org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
> Mar 05 05:55:50   at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
> Mar 05 05:55:50   at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
> Mar 05 05:55:50   at 
> org.apache.flink.table.sql.codegen.SqlITCaseBase.checkJsonResultFile(SqlITCaseBase.java:168)
> Mar 05 05:55:50   at 
> org.apache.flink.table.sql.codegen.SqlITCaseBase.runAndCheckSQL(SqlITCaseBase.java:111)
> Mar 05 05:55:50   at 
> org.apache.flink.table.sql.codegen.PlannerScalaFreeITCase.testImperativeUdaf(PlannerScalaFreeITCase.java:43)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31355) CommonDataStreamTests.test_execute_and_collect failed

2023-03-07 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697291#comment-17697291
 ] 

Matthias Pohl commented on FLINK-31355:
---

[~hxbks2ks] can you have a look at it?

> CommonDataStreamTests.test_execute_and_collect failed
> -
>
> Key: FLINK-31355
> URL: https://issues.apache.org/jira/browse/FLINK-31355
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46883=logs=821b528f-1eed-5598-a3b4-7f748b13f261=6bb545dd-772d-5d8c-f258-f5085fba3295=32651
> {code}
> Mar 07 07:20:04 ERRORroot:java_gateway.py:1055 Exception while sending 
> command.
> Mar 07 07:20:04 Traceback (most recent call last):
> Mar 07 07:20:04   File 
> "/__w/1/s/flink-python/.tox/py310-cython/lib/python3.10/site-packages/py4j/java_gateway.py",
>  line 1224, in send_command
> Mar 07 07:20:04 raise Py4JNetworkError("Answer from Java side is empty")
> Mar 07 07:20:04 py4j.protocol.Py4JNetworkError: Answer from Java side is empty
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22112: [FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0

2023-03-07 Thread via GitHub


TanYuxin-tyx commented on code in PR #22112:
URL: https://github.com/apache/flink/pull/22112#discussion_r1127307920


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java:
##
@@ -436,7 +442,17 @@ private void mayTriggerReading() {
 && numRequestedBuffers + bufferPool.getNumBuffersPerRequest() 
<= maxRequestedBuffers
 && numRequestedBuffers < 
bufferPool.getAverageBuffersPerRequester()) {
 isRunning = true;
-ioExecutor.execute(this);
+ioExecutor.execute(
+() -> {
+try {
+run();
+} catch (Exception e) {

Review Comment:
   @reswqa Thanks for fixing the bug. I only have a minor comment. Maybe we can 
catch throwable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-31354) NettyClientServerSslTest.testValidSslConnectionAdvanced timed out

2023-03-07 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl reassigned FLINK-31354:
-

Assignee: Matthias Pohl

> NettyClientServerSslTest.testValidSslConnectionAdvanced timed out
> -
>
> Key: FLINK-31354
> URL: https://issues.apache.org/jira/browse/FLINK-31354
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.0
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Blocker
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46883=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8242
> {code}
> Mar 07 05:15:21 [ERROR] 
> NettyClientServerSslTest.testValidSslConnectionAdvanced  Time elapsed: 3.69 s 
>  <<< ERROR!
> Mar 07 05:15:21 
> org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandshakeTimeoutException:
>  handshake timed out after 1000ms
> Mar 07 05:15:21   at 
> org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler$7.run(SslHandler.java:2115)
> Mar 07 05:15:21   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
> Mar 07 05:15:21   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153)
> Mar 07 05:15:21   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
> Mar 07 05:15:21   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
> Mar 07 05:15:21   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
> Mar 07 05:15:21   at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
> Mar 07 05:15:21   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
> Mar 07 05:15:21   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> Mar 07 05:15:21   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1127499052


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -669,6 +669,13 @@ public void setNumBuffers(int numBuffers) {
 
 currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
+// reset overdraft buffers
+while (numberOfRequestedOverdraftMemorySegments > 0
+&& numberOfRequestedMemorySegments < currentPoolSize) {

Review Comment:
   For the case that the pool size becomes smaller, I prefer not to do special 
handling, because we already allow the case that `numRequestedBuffers` exceed 
the `pool size` after `setNumbers`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on PR #22084:
URL: https://github.com/apache/flink/pull/22084#issuecomment-1457725482

   Thanks @1996fanrui for the review and catch the bug, I have updated this 
according to your comments, please take a look again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1127489392


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##
@@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() {
 localBufferPool.lazyDestroy();
 
 // All buffers have been requested, but can not be returned yet.
-assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+
assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
 // Recycle should return buffers to memory segment pool
 for (Buffer buffer : requests) {
 buffer.recycleBuffer();
 }
 }
 
+@Test
+void testDecreasePoolSize() throws Exception {
+LocalBufferPool bufferPool =
+new LocalBufferPool(networkBufferPool, 4, 10, 0, 
Integer.MAX_VALUE, 2);
+Queue buffers = new LinkedList<>();
+
+// set pool size to 5.
+bufferPool.setNumBuffers(5);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+// request all buffer.
+for (int i = 0; i < 5; i++) {
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+}
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// request 1 overdraft buffers.
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// set pool size to 4.
+bufferPool.setNumBuffers(4);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(4);
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// return all overdraft buffers.
+bufferPool.recycle(buffers.poll());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+bufferPool.recycle(buffers.poll());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// return the excess buffer.
+bufferPool.recycle(buffers.poll());
+assertThat(bufferPool.isAvailable()).isFalse();
+// return non-excess buffers.
+bufferPool.recycle(buffers.poll());
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isTrue();
+
+while (!buffers.isEmpty()) {
+bufferPool.recycle(buffers.poll());
+}
+bufferPool.lazyDestroy();
+}
+
+@Test
+void testIncreasePoolSize() throws Exception {
+LocalBufferPool bufferPool =
+new LocalBufferPool(networkBufferPool, 5, 100, 0, 
Integer.MAX_VALUE, 2);
+List buffers = new ArrayList<>();
+
+// set pool size to 5.
+bufferPool.setNumBuffers(5);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+// request all buffer.
+for (int i = 0; i < 5; i++) {
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+}
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// request 2 overdraft buffers.
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+assertThat(bufferPool.requestMemorySegment()).isNull();
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// set pool size to 10.
+bufferPool.setNumBuffers(10);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(10);
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+// available status will not be influenced by overdraft.
+assertThat(bufferPool.isAvailable()).isTrue();
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+assertThat(bufferPool.isAvailable()).isTrue();

Review Comment:
   Good catch !!! I think the second one is more better. If pool size becomes 
larger, some overdraft buffers need convert to requested buffers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries 

[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1127487679


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##
@@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() {
 localBufferPool.lazyDestroy();
 
 // All buffers have been requested, but can not be returned yet.
-assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+
assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
 // Recycle should return buffers to memory segment pool
 for (Buffer buffer : requests) {
 buffer.recycleBuffer();
 }
 }
 
+@Test
+void testDecreasePoolSize() throws Exception {
+LocalBufferPool bufferPool =
+new LocalBufferPool(networkBufferPool, 4, 10, 0, 
Integer.MAX_VALUE, 2);
+Queue buffers = new LinkedList<>();
+
+// set pool size to 5.
+bufferPool.setNumBuffers(5);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(5);
+
+// request all buffer.
+for (int i = 0; i < 5; i++) {
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+}
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// request 1 overdraft buffers.
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// set pool size to 4.
+bufferPool.setNumBuffers(4);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(4);
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// return all overdraft buffers.
+bufferPool.recycle(buffers.poll());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+bufferPool.recycle(buffers.poll());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// return the excess buffer.
+bufferPool.recycle(buffers.poll());
+assertThat(bufferPool.isAvailable()).isFalse();
+// return non-excess buffers.
+bufferPool.recycle(buffers.poll());
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isTrue();
+
+while (!buffers.isEmpty()) {
+bufferPool.recycle(buffers.poll());
+}
+bufferPool.lazyDestroy();
+}
+
+@Test
+void testIncreasePoolSize() throws Exception {
+LocalBufferPool bufferPool =
+new LocalBufferPool(networkBufferPool, 5, 100, 0, 
Integer.MAX_VALUE, 2);
+List buffers = new ArrayList<>();
+
+// set pool size to 5.
+bufferPool.setNumBuffers(5);

Review Comment:
   Good suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



<    1   2   3