[jira] [Created] (FLINK-31357) A record is deleted before being inserted, it will be deleted
Fangliang Liu created FLINK-31357: - Summary: A record is deleted before being inserted, it will be deleted Key: FLINK-31357 URL: https://issues.apache.org/jira/browse/FLINK-31357 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.14.3 Reporter: Fangliang Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31354) NettyClientServerSslTest.testValidSslConnectionAdvanced timed out
[ https://issues.apache.org/jira/browse/FLINK-31354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-31354: -- Description: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46883=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8242 {code} Test testValidSslConnectionAdvanced[SSL provider = JDK](org.apache.flink.runtime.io.network.netty.NettyClientServerSslTest) is running. 05:15:10,904 [main] INFO org.apache.flink.runtime.io.network.netty.NettyConfig[] - NettyConfig [server address: localhost/127.0.0.1, server port: 42935, ssl enabled: true, memory segment size (bytes): 1024, transport type: AUTO, number of server threads: 1 (manual), number of client threads> 05:15:10,916 [main] INFO org.apache.flink.runtime.io.network.netty.NettyServer[] - Transport type 'auto': using EPOLL. 05:15:12,149 [main] INFO org.apache.flink.runtime.io.network.netty.NettyServer[] - Successful initialization (took 1245 ms). Listening on SocketAddress /127.0.0.1:42935. 05:15:12,150 [main] INFO org.apache.flink.runtime.io.network.netty.NettyClient[] - Transport type 'auto': using EPOLL. 05:15:13,249 [main] INFO org.apache.flink.runtime.io.network.netty.NettyClient[] - Successful initialization (took 1099 ms). 05:15:14,588 [main] ERROR org.apache.flink.runtime.io.network.netty.NettyClientServerSslTest [] - Test testValidSslConnectionAdvanced[SSL provider = JDK](org.apache.flink.runtime.io.network.netty.NettyClientServerSslTest) failed with: org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandshakeTimeoutException: handshake timed out after 1000ms at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler$7.run(SslHandler.java:2115) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) {code} was: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46883=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8242 {code} Mar 07 05:15:21 [ERROR] NettyClientServerSslTest.testValidSslConnectionAdvanced Time elapsed: 3.69 s <<< ERROR! Mar 07 05:15:21 org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandshakeTimeoutException: handshake timed out after 1000ms Mar 07 05:15:21 at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler$7.run(SslHandler.java:2115) Mar 07 05:15:21 at org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) Mar 07 05:15:21 at org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153) Mar 07 05:15:21 at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) Mar 07 05:15:21 at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) Mar 07 05:15:21 at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) Mar 07 05:15:21 at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403) Mar 07 05:15:21 at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) Mar 07 05:15:21 at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) Mar 07 05:15:21 at java.lang.Thread.run(Thread.java:748) {code} > NettyClientServerSslTest.testValidSslConnectionAdvanced timed out >
[jira] [Updated] (FLINK-31356) Serialize garbled characters at checkpoint
[ https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John updated FLINK-31356: - Description: {panel:title=The last checkpoint of the program was successful} 2023-03-07 08:33:16,085 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job 8b5720a4a40f50b995c97c6fe5b93079. 2023-03-07 08:33:16,918 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in 849 ms). 2023-03-07 08:33:16,918 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking checkpoint 39126 as completed for source Source: kafkaDataStream. 2023-03-07 08:36:10,444 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to FAILED on container_e38_1676011848026_0012_01_02 @ (dataPort=44633). java.lang.RuntimeException: Writing records to JDBC failed. at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153) ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6] {panel} {panel:title=But from this checkpoint restore, it can't be decoded} Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.13.6.jar:1.13.6] ... 10 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.13.6.jar:1.13.6] ... 10 more Caused by: java.io.UTFDataFormatException: malformed input around byte 32 at java.io.DataInputStream.readUTF(DataInputStream.java:656) ~[?:1.8.0_201] at java.io.DataInputStream.readUTF(DataInputStream.java:564) ~[?:1.8.0_201] at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:379) ~[flink-core-1.13.6.jar:1.13.6] at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:155) ~[flink-core-1.13.6.jar:1.13.6] at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) ~[flink-core-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at
[jira] [Updated] (FLINK-31356) Serialize garbled characters at checkpoint
[ https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John updated FLINK-31356: - Description: {panel:title=The last checkpoint of the program was successful} 2023-03-07 08:33:16,085 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job 8b5720a4a40f50b995c97c6fe5b93079. 2023-03-07 08:33:16,918 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in 849 ms). 2023-03-07 08:33:16,918 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking checkpoint 39126 as completed for source Source: userKafkaDataStream. 2023-03-07 08:36:10,444 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to FAILED on container_e38_1676011848026_0012_01_02 @ (dataPort=44633). java.lang.RuntimeException: Writing records to JDBC failed. at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153) ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6] {panel} {panel:title=But from this checkpoint restore, it can't be decoded} Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.13.6.jar:1.13.6] ... 10 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.13.6.jar:1.13.6] ... 10 more Caused by: java.io.UTFDataFormatException: malformed input around byte 32 at java.io.DataInputStream.readUTF(DataInputStream.java:656) ~[?:1.8.0_201] at java.io.DataInputStream.readUTF(DataInputStream.java:564) ~[?:1.8.0_201] at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:379) ~[flink-core-1.13.6.jar:1.13.6] at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:155) ~[flink-core-1.13.6.jar:1.13.6] at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) ~[flink-core-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at
[jira] [Created] (FLINK-31356) Serialize garbled characters at checkpoint
John created FLINK-31356: Summary: Serialize garbled characters at checkpoint Key: FLINK-31356 URL: https://issues.apache.org/jira/browse/FLINK-31356 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.13.6 Reporter: John {panel:title=The last checkpoint of the program was successful} 2023-03-07 08:33:16,085 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job 8b5720a4a40f50b995c97c6fe5b93079. 2023-03-07 08:33:16,918 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in 849 ms). 2023-03-07 08:33:16,918 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking checkpoint 39126 as completed for source Source: userKafkaDataStream. 2023-03-07 08:36:10,444 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: UnifiedUser_mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to FAILED on container_e38_1676011848026_0012_01_02 @ ecs-iovc-prd-flink-0006 (dataPort=44633). java.lang.RuntimeException: Writing records to JDBC failed. at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153) ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6] {panel} {panel:title=But from this checkpoint restore, it can't be decoded} Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.13.6.jar:1.13.6] ... 10 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.11-1.13.6.jar:1.13.6] ... 10 more Caused by: java.io.UTFDataFormatException: malformed input around byte 32 at java.io.DataInputStream.readUTF(DataInputStream.java:656) ~[?:1.8.0_201] at java.io.DataInputStream.readUTF(DataInputStream.java:564) ~[?:1.8.0_201] at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:379) ~[flink-core-1.13.6.jar:1.13.6] at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:155) ~[flink-core-1.13.6.jar:1.13.6] at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) ~[flink-core-1.13.6.jar:1.13.6] at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at
[GitHub] [flink] jiangxin369 commented on a diff in pull request #22034: [FLINK-31240][table] Reduce the overhead of conversion between DataStream and Table
jiangxin369 commented on code in PR #22034: URL: https://github.com/apache/flink/pull/22034#discussion_r1127524788 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java: ## @@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws Exception { Row.of("c", 1000)); } +@Test +public void testFromAndToDataStreamBypassWithPojo() throws Exception { +env.setParallelism(1); +final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + +final List> tuples = +Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), Tuple2.of(3L, "c")); + +final DataStream> dataStream = +env.fromCollection(tuples, Types.TUPLE(Types.LONG, Types.STRING)); + +final Table table = tableEnv.fromDataStream(dataStream); + +final DataStream> convertedDataStream = +tableEnv.toDataStream(table, DataTypes.of(dataStream.getType())); + +assertEquals(dataStream, convertedDataStream); +testResult(convertedDataStream, tuples.toArray(new Tuple2[0])); + +final Table tableWithPK = +tableEnv.fromDataStream( +dataStream, +Schema.newBuilder() +.column("f0", BIGINT().notNull()) +.column("f1", STRING()) +.primaryKey("f0") +.build()); +final DataStream> convertedDataStreamWithPK = +tableEnv.toDataStream(tableWithPK, DataTypes.of(dataStream.getType())); + +assertNotEquals(dataStream, convertedDataStreamWithPK); +testResult(convertedDataStreamWithPK, tuples.toArray(new Tuple2[0])); +} + +@Test +public void testFromAndToDataStreamBypassWithRow() throws Exception { +env.setParallelism(1); +final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + +final SourceFunction rowGenerator = +new SourceFunction() { +@Override +public final void run(SourceContext ctx) throws Exception { +Row row = new Row(2); +row.setField(0, 1L); +row.setField(1, "a"); +ctx.collect(row); +} + +@Override +public void cancel() {} +}; + +final RowTypeInfo typeInfo = +new RowTypeInfo(new TypeInformation[] {Types.LONG, Types.STRING}); + +// test datastream of rows with non-default name +DataStream dataStream = env.addSource(rowGenerator, typeInfo); + +Table table = tableEnv.fromDataStream(dataStream); +DataStream convertedDataStream = +tableEnv.toDataStream(table, DataTypes.of(dataStream.getType())); + +assertEquals(dataStream, convertedDataStream); + +// access rows by default name +DataStream transformedDataStream = +convertedDataStream.map( Review Comment: This transformation is to make sure that the returned datastream can be accessed by default name(f0, f1...). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] jiangxin369 commented on a diff in pull request #22034: [FLINK-31240][table] Reduce the overhead of conversion between DataStream and Table
jiangxin369 commented on code in PR #22034: URL: https://github.com/apache/flink/pull/22034#discussion_r1127523251 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java: ## @@ -377,6 +385,102 @@ public void testFromAndToDataStreamEventTime() throws Exception { Row.of("c", 1000)); } +@Test +public void testFromAndToDataStreamBypassWithPojo() throws Exception { +env.setParallelism(1); +final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + +final List> tuples = +Arrays.asList(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), Tuple2.of(3L, "c")); + +final DataStream> dataStream = +env.fromCollection(tuples, Types.TUPLE(Types.LONG, Types.STRING)); + +final Table table = tableEnv.fromDataStream(dataStream); + +final DataStream> convertedDataStream = +tableEnv.toDataStream(table, DataTypes.of(dataStream.getType())); + +assertEquals(dataStream, convertedDataStream); +testResult(convertedDataStream, tuples.toArray(new Tuple2[0])); + +final Table tableWithPK = +tableEnv.fromDataStream( +dataStream, +Schema.newBuilder() +.column("f0", BIGINT().notNull()) +.column("f1", STRING()) +.primaryKey("f0") +.build()); +final DataStream> convertedDataStreamWithPK = +tableEnv.toDataStream(tableWithPK, DataTypes.of(dataStream.getType())); + +assertNotEquals(dataStream, convertedDataStreamWithPK); +testResult(convertedDataStreamWithPK, tuples.toArray(new Tuple2[0])); +} + +@Test +public void testFromAndToDataStreamBypassWithRow() throws Exception { +env.setParallelism(1); +final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + +final SourceFunction rowGenerator = +new SourceFunction() { +@Override +public final void run(SourceContext ctx) throws Exception { +Row row = new Row(2); +row.setField(0, 1L); +row.setField(1, "a"); +ctx.collect(row); +} + +@Override +public void cancel() {} +}; + +final RowTypeInfo typeInfo = +new RowTypeInfo(new TypeInformation[] {Types.LONG, Types.STRING}); + +// test datastream of rows with non-default name +DataStream dataStream = env.addSource(rowGenerator, typeInfo); Review Comment: `env.fromCollection(...)` generates row data with RowSerializer and it would construct the row with field name so that the rows of datastream can be accessed by non-default name anyway. But we want to test the case that the code change works even if the datastream is composed of positioned rows without field names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on pull request #21636: [FLINK-30113] Implement state compression for broadcast and regular operator states
echauchot commented on PR #21636: URL: https://github.com/apache/flink/pull/21636#issuecomment-1457760168 > What you did for versioning looks sane. I think the problem is with line > > https://github.com/apache/flink/blob/a4a11a389609deb5d1de139115cedc35b675b366/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java#L68 > > which has not been implemented in a future proof way. The intent there was to throw an exception for `readVersion < 6`, not less than `CURRENT_STATE_META_INFO_SNAPSHOT_VERSION`. If we change the condition to `readVersion < 6` it should work. Hi @dawidwys, thanks for the prompt answer. I suspected something not future proof, thanks for the confirmation. It is now fixed. Tests pass but the CI fails in preparation of the E2E tests (since some days) with a http 404 error. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] TanYuxin-tyx commented on pull request #22112: [FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0
TanYuxin-tyx commented on PR #22112: URL: https://github.com/apache/flink/pull/22112#issuecomment-1457759198 Now LGTM, thanks @reswqa for fixing the issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31341) OutOfMemoryError in Kafka e2e tests
[ https://issues.apache.org/jira/browse/FLINK-31341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-31341: -- Priority: Critical (was: Blocker) > OutOfMemoryError in Kafka e2e tests > --- > > Key: FLINK-31341 > URL: https://issues.apache.org/jira/browse/FLINK-31341 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > We experience a OOM in Kafka e2e tests: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46820=logs=fb37c667-81b7-5c22-dd91-846535e99a97=39a035c3-c65e-573c-fb66-104c66c28912=11726 > {code} > ar 06 06:22:30 [ERROR] Exception: java.lang.OutOfMemoryError thrown from the > UncaughtExceptionHandler in thread "ForkJoinPool-1-worker-0" > Exception in thread "ForkJoinPool-1-worker-0" Mar 06 06:27:30 [ERROR] Tests > run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1,094.139 s <<< > FAILURE! - in JUnit Jupiter > Mar 06 06:27:30 [ERROR] JUnit Jupiter.JUnit Jupiter Time elapsed: 947.463 s > <<< ERROR! > Mar 06 06:27:30 org.junit.platform.commons.JUnitException: TestEngine with ID > 'junit-jupiter' failed to execute tests > Mar 06 06:27:30 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:153) > Mar 06 06:27:30 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127) > Mar 06 06:27:30 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90) > Mar 06 06:27:30 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55) > Mar 06 06:27:30 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102) > Mar 06 06:27:30 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54) > Mar 06 06:27:30 at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) > Mar 06 06:27:30 at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) > Mar 06 06:27:30 at > org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) > Mar 06 06:27:30 at > org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) > Mar 06 06:27:30 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188) > Mar 06 06:27:30 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154) > Mar 06 06:27:30 at > org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128) > Mar 06 06:27:30 at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428) > Mar 06 06:27:30 at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162) > Mar 06 06:27:30 at > org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562) > Mar 06 06:27:30 at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548) > Mar 06 06:27:30 Caused by: org.junit.platform.commons.JUnitException: Error > executing tests for engine junit-jupiter > Mar 06 06:27:30 at > org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:57) > Mar 06 06:27:30 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147) > Mar 06 06:27:30 ... 16 more > Mar 06 06:27:30 Caused by: java.util.concurrent.ExecutionException: > java.lang.OutOfMemoryError > Mar 06 06:27:30 at > java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006) > Mar 06 06:27:30 at > org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) > Mar 06 06:27:30 ... 17 more > Mar 06 06:27:30 Caused by: java.lang.OutOfMemoryError > Mar 06 06:27:30 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Mar 06 06:27:30 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Mar 06 06:27:30 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > Mar 06 06:27:30 at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > Mar 06 06:27:30 at >
[GitHub] [flink] reswqa commented on pull request #22112: [FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0
reswqa commented on PR #22112: URL: https://github.com/apache/flink/pull/22112#issuecomment-1457747179 Thanks @TanYuxin-tyx for the review, I have updated this according to your comments, please take a look again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31342) SQLClientSchemaRegistryITCase timed out when starting the test container
[ https://issues.apache.org/jira/browse/FLINK-31342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-31342: -- Priority: Critical (was: Blocker) > SQLClientSchemaRegistryITCase timed out when starting the test container > > > Key: FLINK-31342 > URL: https://issues.apache.org/jira/browse/FLINK-31342 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46820=logs=fb37c667-81b7-5c22-dd91-846535e99a97=39a035c3-c65e-573c-fb66-104c66c28912=11767 > {code} > Mar 06 06:53:47 [ERROR] Tests run: 2, Failures: 0, Errors: 1, Skipped: 1, > Time elapsed: 1,037.927 s <<< FAILURE! - in > org.apache.flink.tests.util.kafka.SQLClientSchemaRegistryITCase > Mar 06 06:53:47 [ERROR] > org.apache.flink.tests.util.kafka.SQLClientSchemaRegistryITCase Time > elapsed: 1,037.927 s <<< ERROR! > Mar 06 06:53:47 org.junit.runners.model.TestTimedOutException: test timed out > after 10 minutes > Mar 06 06:53:47 at sun.misc.Unsafe.park(Native Method) > Mar 06 06:53:47 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > Mar 06 06:53:47 at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > Mar 06 06:53:47 at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > Mar 06 06:53:47 at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > Mar 06 06:53:47 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > Mar 06 06:53:47 at > org.testcontainers.containers.GenericContainer.start(GenericContainer.java:323) > Mar 06 06:53:47 at > org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1063) > Mar 06 06:53:47 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) > Mar 06 06:53:47 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > Mar 06 06:53:47 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > Mar 06 06:53:47 at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > Mar 06 06:53:47 at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on a diff in pull request #22112: [FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0
reswqa commented on code in PR #22112: URL: https://github.com/apache/flink/pull/22112#discussion_r1127507881 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java: ## @@ -436,7 +442,17 @@ private void mayTriggerReading() { && numRequestedBuffers + bufferPool.getNumBuffersPerRequest() <= maxRequestedBuffers && numRequestedBuffers < bufferPool.getAverageBuffersPerRequester()) { isRunning = true; -ioExecutor.execute(this); +ioExecutor.execute( +() -> { +try { +run(); +} catch (Exception e) { Review Comment: Good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31339) PlannerScalaFreeITCase.testImperativeUdaf
[ https://issues.apache.org/jira/browse/FLINK-31339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697292#comment-17697292 ] Matthias Pohl commented on FLINK-31339: --- [~qafro] may you have a look at it? > PlannerScalaFreeITCase.testImperativeUdaf > - > > Key: FLINK-31339 > URL: https://issues.apache.org/jira/browse/FLINK-31339 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: test-stability > > {{PlannerScalaFreeITCase.testImperativeUdaf}} failed: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46812=logs=87489130-75dc-54e4-1f45-80c30aa367a3=73da6d75-f30d-5d5a-acbe-487a9dcff678=15012 > {code} > Mar 05 05:55:50 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 62.028 s <<< FAILURE! - in > org.apache.flink.table.sql.codegen.PlannerScalaFreeITCase > Mar 05 05:55:50 [ERROR] PlannerScalaFreeITCase.testImperativeUdaf Time > elapsed: 40.924 s <<< FAILURE! > Mar 05 05:55:50 org.opentest4j.AssertionFailedError: Did not get expected > results before timeout, actual result: > [{"before":null,"after":{"user_name":"Bob","order_cnt":1},"op":"c"}, > {"before":null,"after":{"user_name":"Alice","order_cnt":1},"op":"c"}, > {"before":{"user_name":"Bob","order_cnt":1},"after":null,"op":"d"}, > {"before":null,"after":{"user_name":"Bob","order_cnt":2},"op":"c"}]. ==> > expected: but was: > Mar 05 05:55:50 at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > Mar 05 05:55:50 at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > Mar 05 05:55:50 at > org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > Mar 05 05:55:50 at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > Mar 05 05:55:50 at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > Mar 05 05:55:50 at > org.apache.flink.table.sql.codegen.SqlITCaseBase.checkJsonResultFile(SqlITCaseBase.java:168) > Mar 05 05:55:50 at > org.apache.flink.table.sql.codegen.SqlITCaseBase.runAndCheckSQL(SqlITCaseBase.java:111) > Mar 05 05:55:50 at > org.apache.flink.table.sql.codegen.PlannerScalaFreeITCase.testImperativeUdaf(PlannerScalaFreeITCase.java:43) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31355) CommonDataStreamTests.test_execute_and_collect failed
[ https://issues.apache.org/jira/browse/FLINK-31355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697291#comment-17697291 ] Matthias Pohl commented on FLINK-31355: --- [~hxbks2ks] can you have a look at it? > CommonDataStreamTests.test_execute_and_collect failed > - > > Key: FLINK-31355 > URL: https://issues.apache.org/jira/browse/FLINK-31355 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Priority: Blocker > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46883=logs=821b528f-1eed-5598-a3b4-7f748b13f261=6bb545dd-772d-5d8c-f258-f5085fba3295=32651 > {code} > Mar 07 07:20:04 ERRORroot:java_gateway.py:1055 Exception while sending > command. > Mar 07 07:20:04 Traceback (most recent call last): > Mar 07 07:20:04 File > "/__w/1/s/flink-python/.tox/py310-cython/lib/python3.10/site-packages/py4j/java_gateway.py", > line 1224, in send_command > Mar 07 07:20:04 raise Py4JNetworkError("Answer from Java side is empty") > Mar 07 07:20:04 py4j.protocol.Py4JNetworkError: Answer from Java side is empty > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22112: [FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0
TanYuxin-tyx commented on code in PR #22112: URL: https://github.com/apache/flink/pull/22112#discussion_r1127307920 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java: ## @@ -436,7 +442,17 @@ private void mayTriggerReading() { && numRequestedBuffers + bufferPool.getNumBuffersPerRequest() <= maxRequestedBuffers && numRequestedBuffers < bufferPool.getAverageBuffersPerRequester()) { isRunning = true; -ioExecutor.execute(this); +ioExecutor.execute( +() -> { +try { +run(); +} catch (Exception e) { Review Comment: @reswqa Thanks for fixing the bug. I only have a minor comment. Maybe we can catch throwable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-31354) NettyClientServerSslTest.testValidSslConnectionAdvanced timed out
[ https://issues.apache.org/jira/browse/FLINK-31354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl reassigned FLINK-31354: - Assignee: Matthias Pohl > NettyClientServerSslTest.testValidSslConnectionAdvanced timed out > - > > Key: FLINK-31354 > URL: https://issues.apache.org/jira/browse/FLINK-31354 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.0 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Blocker > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46883=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=8242 > {code} > Mar 07 05:15:21 [ERROR] > NettyClientServerSslTest.testValidSslConnectionAdvanced Time elapsed: 3.69 s > <<< ERROR! > Mar 07 05:15:21 > org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandshakeTimeoutException: > handshake timed out after 1000ms > Mar 07 05:15:21 at > org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler$7.run(SslHandler.java:2115) > Mar 07 05:15:21 at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) > Mar 07 05:15:21 at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153) > Mar 07 05:15:21 at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) > Mar 07 05:15:21 at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) > Mar 07 05:15:21 at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) > Mar 07 05:15:21 at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403) > Mar 07 05:15:21 at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) > Mar 07 05:15:21 at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > Mar 07 05:15:21 at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1127499052 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -669,6 +669,13 @@ public void setNumBuffers(int numBuffers) { currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments); +// reset overdraft buffers +while (numberOfRequestedOverdraftMemorySegments > 0 +&& numberOfRequestedMemorySegments < currentPoolSize) { Review Comment: For the case that the pool size becomes smaller, I prefer not to do special handling, because we already allow the case that `numRequestedBuffers` exceed the `pool size` after `setNumbers`. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on PR #22084: URL: https://github.com/apache/flink/pull/22084#issuecomment-1457725482 Thanks @1996fanrui for the review and catch the bug, I have updated this according to your comments, please take a look again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1127489392 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ## @@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() { localBufferPool.lazyDestroy(); // All buffers have been requested, but can not be returned yet. -assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool()); + assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers); // Recycle should return buffers to memory segment pool for (Buffer buffer : requests) { buffer.recycleBuffer(); } } +@Test +void testDecreasePoolSize() throws Exception { +LocalBufferPool bufferPool = +new LocalBufferPool(networkBufferPool, 4, 10, 0, Integer.MAX_VALUE, 2); +Queue buffers = new LinkedList<>(); + +// set pool size to 5. +bufferPool.setNumBuffers(5); +assertThat(bufferPool.getNumBuffers()).isEqualTo(5); + +// request all buffer. +for (int i = 0; i < 5; i++) { +buffers.add(bufferPool.requestMemorySegmentBlocking()); +} +assertThat(bufferPool.isAvailable()).isFalse(); + +// request 1 overdraft buffers. +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// set pool size to 4. +bufferPool.setNumBuffers(4); +assertThat(bufferPool.getNumBuffers()).isEqualTo(4); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); +assertThat(bufferPool.isAvailable()).isFalse(); + +// return all overdraft buffers. +bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); +bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// return the excess buffer. +bufferPool.recycle(buffers.poll()); +assertThat(bufferPool.isAvailable()).isFalse(); +// return non-excess buffers. +bufferPool.recycle(buffers.poll()); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isTrue(); + +while (!buffers.isEmpty()) { +bufferPool.recycle(buffers.poll()); +} +bufferPool.lazyDestroy(); +} + +@Test +void testIncreasePoolSize() throws Exception { +LocalBufferPool bufferPool = +new LocalBufferPool(networkBufferPool, 5, 100, 0, Integer.MAX_VALUE, 2); +List buffers = new ArrayList<>(); + +// set pool size to 5. +bufferPool.setNumBuffers(5); +assertThat(bufferPool.getNumBuffers()).isEqualTo(5); + +// request all buffer. +for (int i = 0; i < 5; i++) { +buffers.add(bufferPool.requestMemorySegmentBlocking()); +} +assertThat(bufferPool.isAvailable()).isFalse(); + +// request 2 overdraft buffers. +buffers.add(bufferPool.requestMemorySegmentBlocking()); +buffers.add(bufferPool.requestMemorySegmentBlocking()); +assertThat(bufferPool.requestMemorySegment()).isNull(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); +assertThat(bufferPool.isAvailable()).isFalse(); + +// set pool size to 10. +bufferPool.setNumBuffers(10); +assertThat(bufferPool.getNumBuffers()).isEqualTo(10); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); +// available status will not be influenced by overdraft. +assertThat(bufferPool.isAvailable()).isTrue(); +buffers.add(bufferPool.requestMemorySegmentBlocking()); +assertThat(bufferPool.isAvailable()).isTrue(); Review Comment: Good catch !!! I think the second one is more better. If pool size becomes larger, some overdraft buffers need convert to requested buffers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1127487679 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ## @@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() { localBufferPool.lazyDestroy(); // All buffers have been requested, but can not be returned yet. -assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool()); + assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers); // Recycle should return buffers to memory segment pool for (Buffer buffer : requests) { buffer.recycleBuffer(); } } +@Test +void testDecreasePoolSize() throws Exception { +LocalBufferPool bufferPool = +new LocalBufferPool(networkBufferPool, 4, 10, 0, Integer.MAX_VALUE, 2); +Queue buffers = new LinkedList<>(); + +// set pool size to 5. +bufferPool.setNumBuffers(5); +assertThat(bufferPool.getNumBuffers()).isEqualTo(5); + +// request all buffer. +for (int i = 0; i < 5; i++) { +buffers.add(bufferPool.requestMemorySegmentBlocking()); +} +assertThat(bufferPool.isAvailable()).isFalse(); + +// request 1 overdraft buffers. +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// set pool size to 4. +bufferPool.setNumBuffers(4); +assertThat(bufferPool.getNumBuffers()).isEqualTo(4); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); +assertThat(bufferPool.isAvailable()).isFalse(); + +// return all overdraft buffers. +bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); +bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// return the excess buffer. +bufferPool.recycle(buffers.poll()); +assertThat(bufferPool.isAvailable()).isFalse(); +// return non-excess buffers. +bufferPool.recycle(buffers.poll()); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isTrue(); + +while (!buffers.isEmpty()) { +bufferPool.recycle(buffers.poll()); +} +bufferPool.lazyDestroy(); +} + +@Test +void testIncreasePoolSize() throws Exception { +LocalBufferPool bufferPool = +new LocalBufferPool(networkBufferPool, 5, 100, 0, Integer.MAX_VALUE, 2); +List buffers = new ArrayList<>(); + +// set pool size to 5. +bufferPool.setNumBuffers(5); Review Comment: Good suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org