[GitHub] [flink-table-store] JingsongLi closed pull request #569: [FLINK-31269] Split hive connector to each module of each version
JingsongLi closed pull request #569: [FLINK-31269] Split hive connector to each module of each version URL: https://github.com/apache/flink-table-store/pull/569 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaoyunhaii commented on pull request #21736: [FLINK-27518][tests] Refactor migration tests to support version update automatically
gaoyunhaii commented on PR #21736: URL: https://github.com/apache/flink/pull/21736#issuecomment-1459669377 Hi @XComp the option of having the information in the resource direction looks good to me, I'll update the PR and also squash the commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31307) RocksDB:java.lang.UnsatisfiedLinkError
[ https://issues.apache.org/jira/browse/FLINK-31307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697765#comment-17697765 ] Yun Tang commented on FLINK-31307: -- What's the meaning of {{2.2.1}} after {{frocksdbjni-6.20.3-ververica-1.0.jar}}? One possible problem is that your application jar contains an open-source RocksDB which does not have such a method only in Flink version. > RocksDB:java.lang.UnsatisfiedLinkError > -- > > Key: FLINK-31307 > URL: https://issues.apache.org/jira/browse/FLINK-31307 > Project: Flink > Issue Type: Bug >Affects Versions: 1.14.5 >Reporter: Wujunzhe >Priority: Major > Attachments: image-2023-03-03-10-27-04-810.png, > image-2023-03-03-10-29-27-477.png, image-2023-03-03-11-28-54-674.png > > > when i use rocksdb like > !image-2023-03-03-10-27-04-810.png! > > I got an unsolvable exception. > !image-2023-03-03-10-29-27-477.png! > What can I do to troubleshoot or solve this problem? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] WencongLiu commented on pull request #22128: [FLINK-31298] backport to branch release-1.16
WencongLiu commented on PR #22128: URL: https://github.com/apache/flink/pull/22128#issuecomment-1459661390 cc @reswqa -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on pull request #22129: [FLINK-31298] backport to branch release-1.17
WencongLiu commented on PR #22129: URL: https://github.com/apache/flink/pull/22129#issuecomment-1459661015 cc @reswqa -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on pull request #22129: [FLINK-31298] backport to branch release-1.17
WencongLiu commented on PR #22129: URL: https://github.com/apache/flink/pull/22129#issuecomment-1459660239 cc @reswqa -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22129: [FLINK-31298] backport to branch release-1.17
flinkbot commented on PR #22129: URL: https://github.com/apache/flink/pull/22129#issuecomment-1459658319 ## CI report: * 9cb47c24198a1806cffbfd41515500284f92d486 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22128: [FLINK-31298] backport to branch release-1.16
flinkbot commented on PR #22128: URL: https://github.com/apache/flink/pull/22128#issuecomment-1459658196 ## CI report: * 6e3d0b7ae2fc6cea17d3822b4fcf6844c927171e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu opened a new pull request, #22129: [FLINK-31298] backport to branch release-1.17
WencongLiu opened a new pull request, #22129: URL: https://github.com/apache/flink/pull/22129 fix ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu opened a new pull request, #22128: [FLINK-31298] backport to branch 1.16
WencongLiu opened a new pull request, #22128: URL: https://github.com/apache/flink/pull/22128 fix ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil commented on a diff in pull request #22127: [FLINK-31351][sql-gateway] Don't stop the stuck thread by force
lincoln-lil commented on code in PR #22127: URL: https://github.com/apache/flink/pull/22127#discussion_r1129043195 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java: ## @@ -183,12 +190,15 @@ public ResultSet fetchResults( /** Closes the {@link OperationManager} and all operations. */ public void close() { stateLock.writeLock().lock(); +Exception closeException = null; try { isRunning = false; -for (Operation operation : submittedOperations.values()) { -operation.close(); -} +List copiedOperations = new ArrayList<>(submittedOperations.values()); Review Comment: we can remove this copy and move the 'clear()' to finally clause ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java: ## @@ -30,26 +30,33 @@ import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.service.result.NotReadyResult; import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.table.gateway.service.utils.SqlCancelException; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Field; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.FutureTask; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.function.Supplier; +import static org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; Review Comment: nit: just use normal import? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30829) Make the backpressure tab could be sort by the backpressure level
[ https://issues.apache.org/jira/browse/FLINK-30829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-30829. -- Fix Version/s: 1.18.0 Resolution: Fixed master(1.18) via ed30a4f906f35654745ca2f694b540ed50e0e14e. > Make the backpressure tab could be sort by the backpressure level > - > > Key: FLINK-30829 > URL: https://issues.apache.org/jira/browse/FLINK-30829 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.17.0 >Reporter: Zhanghao Chen >Assignee: Wencong Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > [FLINK-29998|https://issues.apache.org/jira/browse/FLINK-29998] enables user > to sort the backpressure tab to see which task is busiest. Another common > scenario for backpressure analysis is to find which tasks are backpressured. > We should add support to sort the backpressure tab by backpressure level as > well. > > h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30829) Make the backpressure tab could be sort by the backpressure level
[ https://issues.apache.org/jira/browse/FLINK-30829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-30829: --- Affects Version/s: 1.18.0 > Make the backpressure tab could be sort by the backpressure level > - > > Key: FLINK-30829 > URL: https://issues.apache.org/jira/browse/FLINK-30829 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.17.0, 1.18.0 >Reporter: Zhanghao Chen >Assignee: Wencong Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > [FLINK-29998|https://issues.apache.org/jira/browse/FLINK-29998] enables user > to sort the backpressure tab to see which task is busiest. Another common > scenario for backpressure analysis is to find which tasks are backpressured. > We should add support to sort the backpressure tab by backpressure level as > well. > > h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on pull request #22108: [FLINK-30829] Make the backpressure tab could be sort by busy/backpressure/idle seperately
reswqa commented on PR #22108: URL: https://github.com/apache/flink/pull/22108#issuecomment-1459628828 Thanks @yangjunhan, meged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa merged pull request #22108: [FLINK-30829] Make the backpressure tab could be sort by busy/backpressure/idle seperately
reswqa merged PR #22108: URL: https://github.com/apache/flink/pull/22108 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31356) Serialize garbled characters at checkpoint
[ https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697746#comment-17697746 ] Yanfei Lei commented on FLINK-31356: Did any POJO fields update when restoring? Is it related to https://issues.apache.org/jira/browse/FLINK-21752? A common cause of "java.io.UTFDataFormatException: malformed input around byte" is the read and write in serializer are not symmetrical. > Serialize garbled characters at checkpoint > -- > > Key: FLINK-31356 > URL: https://issues.apache.org/jira/browse/FLINK-31356 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.13.6 >Reporter: John >Priority: Major > > > {panel:title=The last checkpoint of the program was successful} > 2023-03-07 08:33:16,085 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job > 8b5720a4a40f50b995c97c6fe5b93079. > 2023-03-07 08:33:16,918 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in > 849 ms). > 2023-03-07 08:33:16,918 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 39126 as completed for source Source: kafkaDataStream. > 2023-03-07 08:36:10,444 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to > FAILED on container_e38_1676011848026_0012_01_02 @ (dataPort=44633). > java.lang.RuntimeException: Writing records to JDBC failed. > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153) > ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6] > {panel} > {panel:title=But from this checkpoint restore, it can't be decoded} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > ... 10 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > ... 10 more > Caused by: java.io.UTFDataFormatException: malformed input around byte 32 > at java.io.DataInputStream.readUTF(DataInputStream.java:656) > ~[?:1.8.0_201] > at java.io.DataInputStream.readUTF(DataInputStream.java:564) >
[GitHub] [flink] reswqa commented on pull request #22108: [FLINK-30829] Make the backpressure tab could be sort by busy/backpressure/idle seperately
reswqa commented on PR #22108: URL: https://github.com/apache/flink/pull/22108#issuecomment-1459620850 @yangjunhan, Would you mind taking a look at this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31298) ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException
[ https://issues.apache.org/jira/browse/FLINK-31298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697743#comment-17697743 ] Weijie Guo commented on FLINK-31298: master(1.18) via 7c5b7be5bc165a9799f10b5761a6ff15edee43b6. release-1.15 waiting for CI. release-1.16 waiting for CI. > ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows > IllegalArgumentException > - > > Key: FLINK-31298 > URL: https://issues.apache.org/jira/browse/FLINK-31298 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: Matthias Pohl >Assignee: Wencong Liu >Priority: Major > Labels: pull-request-available, starter, test-stability > > FLINK-24156 introduced {{NetUtils.acceptWithoutTimeout}} which caused the > test to print a the stacktrace of an {{IllegalArgumentException}}: > {code} > Exception in thread "Thread-0" java.lang.IllegalArgumentException: > serverSocket SO_TIMEOUT option must be 0 > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) > at > org.apache.flink.util.NetUtils.acceptWithoutTimeout(NetUtils.java:139) > at > org.apache.flink.runtime.net.ConnectionUtilsTest$1.run(ConnectionUtilsTest.java:83) > at java.lang.Thread.run(Thread.java:750) > {code} > This is also shown in the Maven output of CI runs and might cause confusion. > The test should be fixed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on pull request #22110: [FLINK-31298] fix ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException
reswqa commented on PR #22110: URL: https://github.com/apache/flink/pull/22110#issuecomment-1459617373 @WencongLiu We should backport this to relase-1.16 and release-1.17 also. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-27258) Deployment Chinese document malformed text
[ https://issues.apache.org/jira/browse/FLINK-27258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jia Fan closed FLINK-27258. --- Resolution: Invalid > Deployment Chinese document malformed text > -- > > Key: FLINK-27258 > URL: https://issues.apache.org/jira/browse/FLINK-27258 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.12.7, 1.13.6, 1.14.4 >Reporter: Jia Fan >Assignee: Jia Fan >Priority: Minor > Labels: pull-request-available, stale-assigned > Attachments: image-2022-04-15-11-35-19-712.png > > > !image-2022-04-15-11-35-19-712.png! > malformed text need to be fix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa merged pull request #22110: [FLINK-31298] fix ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException
reswqa merged PR #22110: URL: https://github.com/apache/flink/pull/22110 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] yuzelin commented on pull request #584: [FLINK-31338] support infer parallelism for flink table store
yuzelin commented on PR #584: URL: https://github.com/apache/flink-table-store/pull/584#issuecomment-1459614486 The same problem I have met before. I've pushed some commits to try to solve the problem. You can rebase master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1129022590 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ## @@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() { localBufferPool.lazyDestroy(); // All buffers have been requested, but can not be returned yet. -assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool()); + assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers); // Recycle should return buffers to memory segment pool for (Buffer buffer : requests) { buffer.recycleBuffer(); } } +@Test +void testDecreasePoolSize() throws Exception { +final int maxMemorySegments = 10; +final int requiredMemorySegments = 4; +final int maxOverdraftBuffers = 2; +final int largePoolSize = 5; +final int smallPoolSize = 4; +LocalBufferPool bufferPool = +new LocalBufferPool( +networkBufferPool, +requiredMemorySegments, +maxMemorySegments, +0, +Integer.MAX_VALUE, +maxOverdraftBuffers); +Queue buffers = new LinkedList<>(); + +// set a larger pool size. +bufferPool.setNumBuffers(largePoolSize); +assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize); + +// request all buffer. +for (int i = 0; i < largePoolSize; i++) { +buffers.add(bufferPool.requestMemorySegmentBlocking()); +} +assertThat(bufferPool.isAvailable()).isFalse(); + +// request 1 overdraft buffers. +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// set a small pool size. +bufferPool.setNumBuffers(smallPoolSize); +assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); +assertThat(bufferPool.isAvailable()).isFalse(); + +// return all overdraft buffers. +bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); +bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// return the excess buffer. +bufferPool.recycle(buffers.poll()); +assertThat(bufferPool.isAvailable()).isFalse(); +// return non-excess buffers. +bufferPool.recycle(buffers.poll()); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isTrue(); + +while (!buffers.isEmpty()) { +bufferPool.recycle(buffers.poll()); +} +bufferPool.lazyDestroy(); +} + +@Test +void testIncreasePoolSize() throws Exception { Review Comment: Sure, I added a test for `largePoolSize = 7`. As for `largePoolSize=8`, I think it is no different from the current situation of `largePoolSize = 10`. It also exceeds the total number of buffers but does not reach `maxMemorySegments`. But for clarity, I changed the previous test to `largePoolSize = 8`, and added test for the case of total number of buffers reached `maxMemorySegments`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1129022590 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ## @@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() { localBufferPool.lazyDestroy(); // All buffers have been requested, but can not be returned yet. -assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool()); + assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers); // Recycle should return buffers to memory segment pool for (Buffer buffer : requests) { buffer.recycleBuffer(); } } +@Test +void testDecreasePoolSize() throws Exception { +final int maxMemorySegments = 10; +final int requiredMemorySegments = 4; +final int maxOverdraftBuffers = 2; +final int largePoolSize = 5; +final int smallPoolSize = 4; +LocalBufferPool bufferPool = +new LocalBufferPool( +networkBufferPool, +requiredMemorySegments, +maxMemorySegments, +0, +Integer.MAX_VALUE, +maxOverdraftBuffers); +Queue buffers = new LinkedList<>(); + +// set a larger pool size. +bufferPool.setNumBuffers(largePoolSize); +assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize); + +// request all buffer. +for (int i = 0; i < largePoolSize; i++) { +buffers.add(bufferPool.requestMemorySegmentBlocking()); +} +assertThat(bufferPool.isAvailable()).isFalse(); + +// request 1 overdraft buffers. +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// set a small pool size. +bufferPool.setNumBuffers(smallPoolSize); +assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); +assertThat(bufferPool.isAvailable()).isFalse(); + +// return all overdraft buffers. +bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); +bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// return the excess buffer. +bufferPool.recycle(buffers.poll()); +assertThat(bufferPool.isAvailable()).isFalse(); +// return non-excess buffers. +bufferPool.recycle(buffers.poll()); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isTrue(); + +while (!buffers.isEmpty()) { +bufferPool.recycle(buffers.poll()); +} +bufferPool.lazyDestroy(); +} + +@Test +void testIncreasePoolSize() throws Exception { Review Comment: Sure, I added a test for `largePoolSize = 7`. As for `largePoolSize=8`, I think it is no different from the current situation of `largePoolSize = 10`. It also exceeds the total number of buffers but does not reach `maxMemorySegments`. But I changed the previous test to `largePoolSize = 8`, and added test for the case of total number of buffers reached `maxMemorySegments`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1129012456 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } +@GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); -return !availableMemorySegments.isEmpty() -&& unavailableSubpartitionsCount == 0 -&& numberOfRequestedOverdraftMemorySegments == 0; +return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: > Before we allows convert numberOfRequestedOverdraftMemorySegments to numberOfRequestedMemorySegments , there still be a situation where both availableMemorySegment and overdraft buffer are all not zero. Just as what I said, only after we supports convert `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments`, then can we avoid having both `available buffers` and `overdraft buffers`. It is not enough to just modify the following code as there are still concurrency problems. ``` if (!availableMemorySegments.isEmpty()) { segment = availableMemorySegments.poll(); } else if (isRequestedSizeReached()) { // Only when the buffer request reaches the upper limit(i.e. current pool size), // requests an overdraft buffer. segment = requestOverdraftMemorySegmentFromGlobal(); } ``` Of course, we have allowed this conversion now, but I'm not very sure whether there are other concurrency problems even though after this fix. I admit that we should not modify the current design for possible future bugs. But there are some subtleties here : I somehow think that this is a redundant judgment condition, and removing it can avoid bugs at the same time. In any case, your reason is also make sense and acceptable to me. Therefore, I will not strongly adhere to my opinion. But for the sake of safety, perhaps we should listen to the opinions of @wsry who knows more about the implementation of `LocalBufferPool` than me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1129012456 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } +@GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); -return !availableMemorySegments.isEmpty() -&& unavailableSubpartitionsCount == 0 -&& numberOfRequestedOverdraftMemorySegments == 0; +return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: > Before we allows convert numberOfRequestedOverdraftMemorySegments to numberOfRequestedMemorySegments , there still be a situation where both availableMemorySegment and overdraft buffer are all not zero. Just as what I said, only after we supports convert `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments`, then can we avoid having both `available buffers` and `overdraft buffers`. It is not enough to just modify the following code as there are still concurrency problems. ``` if (!availableMemorySegments.isEmpty()) { segment = availableMemorySegments.poll(); } else if (isRequestedSizeReached()) { // Only when the buffer request reaches the upper limit(i.e. current pool size), // requests an overdraft buffer. segment = requestOverdraftMemorySegmentFromGlobal(); } ``` Of course, we have allowed this conversion now, but I'm not very sure whether there are other concurrency problems even though after this fix. I admit that we should not modify the current design for possible future bugs. But there are some subtleties here : I somehow think that this is a redundant judgment condition, and removing it can avoid bugs at the same time. In any case, your reason is also make sense and acceptable to me. But for the sake of safety, perhaps we should listen to the opinions of @wsry who knows more about the implementation of `LocalBufferPool` than me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1129012456 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } +@GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); -return !availableMemorySegments.isEmpty() -&& unavailableSubpartitionsCount == 0 -&& numberOfRequestedOverdraftMemorySegments == 0; +return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: > Before we allows convert numberOfRequestedOverdraftMemorySegments to numberOfRequestedMemorySegments , there still be a situation where both availableMemorySegment and overdraft buffer are all not zero. Just as what I said, only after we supports convert `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments`, then can we avoid having both `available buffers` and `overdraft buffers`. It is not enough to just modify the following code as there are still concurrency problems. ``` if (!availableMemorySegments.isEmpty()) { segment = availableMemorySegments.poll(); } else if (isRequestedSizeReached()) { // Only when the buffer request reaches the upper limit(i.e. current pool size), // requests an overdraft buffer. segment = requestOverdraftMemorySegmentFromGlobal(); } ``` Of course, we have allowed this conversion now, but I'm not very sure whether there are other concurrency problems even though after this fix. I admit that we should not modify the current design for possible future bugs. But there are some subtleties here : I somehow think that this is a redundant judgment condition, and removing it can avoid bugs at the same time. In any case, your reason is also make sense and acceptable to me. But for the sake of safety, perhaps we should listen to the opinions of @wsry who knows more about the implementation of `LocalBufferPool` than me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1129012456 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } +@GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); -return !availableMemorySegments.isEmpty() -&& unavailableSubpartitionsCount == 0 -&& numberOfRequestedOverdraftMemorySegments == 0; +return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: > Before we allows convert numberOfRequestedOverdraftMemorySegments to numberOfRequestedMemorySegments , there still be a situation where both availableMemorySegment and overdraft buffer are all not zero. Just as what I said, only after we supports convert `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments`, then can we avoid having both `available buffers` and `overdraft buffers`. It is not enough to just modify the following code as there are still concurrency problems. ``` if (!availableMemorySegments.isEmpty()) { segment = availableMemorySegments.poll(); } else if (isRequestedSizeReached()) { // Only when the buffer request reaches the upper limit(i.e. current pool size), // requests an overdraft buffer. segment = requestOverdraftMemorySegmentFromGlobal(); } ``` Of course, we have allowed this conversion now, but I'm not very sure whether there are other concurrency problems even though after this fix. In any case, your reason is also make sense and acceptable to me. But for the sake of safety, perhaps we should listen to the opinions of @wsry who knows more about the implementation of `LocalBufferPool` than me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
1996fanrui commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1128996203 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } +@GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); -return !availableMemorySegments.isEmpty() -&& unavailableSubpartitionsCount == 0 -&& numberOfRequestedOverdraftMemorySegments == 0; +return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: > Actually, before overdraft buffer was introduced, the definition of available was very clear: There is at least one availableMemorySegment and no subpartitions has reached maxBuffersPerChannel. IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability. Sorry, overdraft buffer should affect the judgment of availability. As we discussed before: overdraft buffer is just used when requested buffer reached the upper limit(pool size). In the other word: _**overdraft buffer just be used after LocalBufferPool is unavailable.**_ And why the name is `overdraft`? It temporarily uses some extra buffers outside the LocalBufferPool during it's unavailable. From the semantics of overdraft, if `numberOfRequestedOverdraftMemorySegments > 0`, then LocalBufferPool must be unavailable. That's why I add it here. You can take a look some backgrounds of the FLIP-227[1], its motivation is: when a task needs multiple network buffers to process a single record and the LocalBufferPool has no buffer (unavailable), it allows the task to overdraw some buffers to prevent the task from getting stuck. Why you want to remove it? I guess it has bug before, that is, the overdraft buffer is used when the `requested buffer` does not reach the upper limit, and you have fixed it in this PR. ``` if (!availableMemorySegments.isEmpty()) { segment = availableMemorySegments.poll(); } else if (isRequestedSizeReached()) { // Only when the buffer request reaches the upper limit(i.e. current pool size), // requests an overdraft buffer. segment = requestOverdraftMemorySegmentFromGlobal(); } ``` > For me, the key point is that if we think "The availableMemorySegments is always empty when numberOfRequestedOverdraftMemorySegments != 0." is tenable. If it's tenable now and future, we can remove the ` && numberOfRequestedOverdraftMemorySegments == 0` here, if not, it cannot be removed. Based on your feedback, I prefer keep it, because the root cause is: overdraft is misused in some cases, and you have fixed it. If there is other bug that misuse overdraft buffer, we should fix the bug to ensure overdraft buffer is used correctly instead of mark LocalBufferPool is available. Marking the LocalBufferPool is available directly may cause other unexpected bugs. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31351) HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 times out on CI
[ https://issues.apache.org/jira/browse/FLINK-31351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697736#comment-17697736 ] Shengkai Fang commented on FLINK-31351: --- I think the main cause is that the Thread#stop is not safe. The problem here is the worker thread is stopped by the canceler thread when the worker thread is still loading classes. However, the class can only be loaded once per classloader. When failed to load the class, the next time Classloader#loadClass will throw exceptions outside. So here I think we just notify the users we can not interrupt thread in time rather than just stop the thread by force. > HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 > times out on CI > - > > Key: FLINK-31351 > URL: https://issues.apache.org/jira/browse/FLINK-31351 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.17.0, 1.16.1, 1.18.0 >Reporter: David Morávek >Assignee: Shengkai Fang >Priority: Blocker > Labels: pull-request-available, test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46872=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24908] > > {code:java} > Mar 06 18:28:56 "ForkJoinPool-1-worker-25" #27 daemon prio=5 os_prio=0 > tid=0x7ff4b1832000 nid=0x21b2 waiting on condition [0x7ff3a8c3e000] > Mar 06 18:28:56java.lang.Thread.State: TIMED_WAITING (sleeping) > Mar 06 18:28:56 at java.lang.Thread.sleep(Native Method) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.waitUntilJobIsRunning(HiveServer2EndpointITCase.java:1004) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.lambda$testExecuteStatementInSyncModeWithRuntimeException2$37(HiveServer2EndpointITCase.java:711) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase$$Lambda$2018/2127600974.accept(Unknown > Source) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.runExecuteStatementInSyncModeWithRuntimeException(HiveServer2EndpointITCase.java:999) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2(HiveServer2EndpointITCase.java:709) > Mar 06 18:28:56 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Mar 06 18:28:56 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Mar 06 18:28:56 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Mar 06 18:28:56 at java.lang.reflect.Method.invoke(Method.java:498) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] FangYongs commented on pull request #426: [FLINK-30323] Support table statistics in table store
FangYongs commented on PR #426: URL: https://github.com/apache/flink-table-store/pull/426#issuecomment-1459593126 Hi @JingsongLi I have updated this PR, please help to review it again, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-31365) Simplify FlinkActionsE2ETest#testMergeInto
[ https://issues.apache.org/jira/browse/FLINK-31365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-31365. Fix Version/s: table-store-0.4.0 Assignee: yuzelin Resolution: Fixed master: 65335ee90aa0b78a0bd8cf18ae351c7696148e23 > Simplify FlinkActionsE2ETest#testMergeInto > -- > > Key: FLINK-31365 > URL: https://issues.apache.org/jira/browse/FLINK-31365 > Project: Flink > Issue Type: Sub-task >Reporter: yuzelin >Assignee: yuzelin >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > Complicated test may cause failure in docker environment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31365) Simplify FlinkActionsE2ETest#testMergeInto
[ https://issues.apache.org/jira/browse/FLINK-31365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31365: --- Labels: pull-request-available (was: ) > Simplify FlinkActionsE2ETest#testMergeInto > -- > > Key: FLINK-31365 > URL: https://issues.apache.org/jira/browse/FLINK-31365 > Project: Flink > Issue Type: Sub-task >Reporter: yuzelin >Priority: Major > Labels: pull-request-available > > Complicated test may cause failure in docker environment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #587: [FLINK-31365] Fix unstable FlinkActionsE2ETest#testMergeInto by simplifying it
JingsongLi merged PR #587: URL: https://github.com/apache/flink-table-store/pull/587 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
1996fanrui commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1128996203 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } +@GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); -return !availableMemorySegments.isEmpty() -&& unavailableSubpartitionsCount == 0 -&& numberOfRequestedOverdraftMemorySegments == 0; +return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: > Actually, before overdraft buffer was introduced, the definition of available was very clear: There is at least one availableMemorySegment and no subpartitions has reached maxBuffersPerChannel. IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability. Sorry, overdraft buffer should affect the judgment of availability. As we discussed before: overdraft buffer is just used when requested buffer reached the upper limit(pool size). In the other word: _**overdraft buffer just be used after LocalBufferPool is unavailable.**_ And why the name is `overdraft`? It temporarily uses some extra buffers outside the LocalBufferPool. From the semantics of overdraft, if `numberOfRequestedOverdraftMemorySegments > 0`, then LocalBufferPool must be unavailable. That's why I add it here. Why you want to remove it? I guess it has bug before, that is, the overdraft buffer is used when the `requested buffer` does not reach the upper limit, and you have fixed it in this PR. ``` if (!availableMemorySegments.isEmpty()) { segment = availableMemorySegments.poll(); } else if (isRequestedSizeReached()) { // Only when the buffer request reaches the upper limit(i.e. current pool size), // requests an overdraft buffer. segment = requestOverdraftMemorySegmentFromGlobal(); } ``` > For me, the key point is that if we think "The availableMemorySegments is always empty when numberOfRequestedOverdraftMemorySegments != 0." is tenable. If it's tenable now and future, we can remove the ` && numberOfRequestedOverdraftMemorySegments == 0` here, if not, it cannot be removed. Based on your feedback, I prefer keep it, because the root cause is: overdraft is misused in some cases, and you have fixed it. If there is other bug that misuse overdraft buffer, we should fix the bug to ensure overdraft buffer is used correctly instead of mark LocalBufferPool is available. Marking the LocalBufferPool is available directly may cause other unexpected bugs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] FangYongs commented on pull request #569: [FLINK-31269] Split hive connector to each module of each version
FangYongs commented on PR #569: URL: https://github.com/apache/flink-table-store/pull/569#issuecomment-1459580158 > > > > Have you tried to setup metastore in hive-2.1-cdh-6.3, and use flink-table-store-hive-catalog.jar in Flink side? Hi @JingsongLi I also test for hive-2.1.1, it works -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] FangYongs commented on pull request #569: [FLINK-31269] Split hive connector to each module of each version
FangYongs commented on PR #569: URL: https://github.com/apache/flink-table-store/pull/569#issuecomment-1459579297 Thanks @tsreaper DONE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
1996fanrui commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1128996203 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } +@GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); -return !availableMemorySegments.isEmpty() -&& unavailableSubpartitionsCount == 0 -&& numberOfRequestedOverdraftMemorySegments == 0; +return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: > Actually, before overdraft buffer was introduced, the definition of available was very clear: There is at least one availableMemorySegment and no subpartitions has reached maxBuffersPerChannel. IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability. Sorry, overdraft buffer should affect the judgment of availability. As we discussed before: overdraft buffer is just used when requested buffer reached the upper limit(pool size). In the other word: overdraft buffer should be used after LocalBufferPool is unavailable. And why the name is `overdraft`? It temporarily uses some extra buffers outside the LocalBufferPool. From the semantics of overdraft, if `numberOfRequestedOverdraftMemorySegments > 0`, then LocalBufferPool must be unavailable. That's why I add it here. Why you want to remove it? I guess it has bug before, that is, the overdraft buffer is used when the `requested buffer` does not reach the upper limit, and you have fixed it in this PR. ``` if (!availableMemorySegments.isEmpty()) { segment = availableMemorySegments.poll(); } else if (isRequestedSizeReached()) { // Only when the buffer request reaches the upper limit(i.e. current pool size), // requests an overdraft buffer. segment = requestOverdraftMemorySegmentFromGlobal(); } ``` > For me, the key point is that if we think "The availableMemorySegments is always empty when numberOfRequestedOverdraftMemorySegments != 0." is tenable. If it's tenable now and future, we can remove the ` && numberOfRequestedOverdraftMemorySegments == 0` here, if not, it cannot be removed. Based on your feedback, I prefer keep it, because the root cause is: overdraft is misused in some cases, and you have fixed it. If there is other bug that misuse overdraft buffer, we should fix the bug to ensure overdraft buffer is used correctly instead of mark LocalBufferPool is available. Marking the LocalBufferPool is available directly may cause other unexpected bugs. ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ## @@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() { localBufferPool.lazyDestroy(); // All buffers have been requested, but can not be returned yet. -assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool()); + assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers); // Recycle should return buffers to memory segment pool for (Buffer buffer : requests) { buffer.recycleBuffer(); } } +@Test +void testDecreasePoolSize() throws Exception { +final int maxMemorySegments = 10; +final int requiredMemorySegments = 4; +final int maxOverdraftBuffers = 2; +final int largePoolSize = 5; +final int smallPoolSize = 4; +LocalBufferPool bufferPool = +new LocalBufferPool( +networkBufferPool, +requiredMemorySegments, +maxMemorySegments, +0, +Integer.MAX_VALUE, +maxOverdraftBuffers); +Queue buffers = new LinkedList<>(); + +// set a larger pool size. +bufferPool.setNumBuffers(largePoolSize); +assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize); + +// request all buffer. +for (int i = 0; i < largePoolSize; i++) { +buffers.add(bufferPool.requestMemorySegmentBlocking()); +} +assertThat(bufferPool.isAvailable()).isFalse(); + +// request 1 overdraft buffers. +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// set a small pool size. +bufferPool.setNumBuffers(smallPoolSize); +
[GitHub] [flink] fsk119 commented on pull request #16532: [FLINK-13400]Remove Hive and Hadoop dependencies from SQL Client
fsk119 commented on PR #16532: URL: https://github.com/apache/flink/pull/16532#issuecomment-1459564895 @snuyanzin Sorry for the late response. I think SqlGatewayE2ECase is a good point to restart the work. We have introduced a hive docker in the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong merged pull request #22104: [hotfix][docs] Fix the wrong configuration key of "table.exec.simplify-operator-name-enabled" in documentation
wuchong merged PR #22104: URL: https://github.com/apache/flink/pull/22104 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-31311) Supports Bounded Watermark streaming read
[ https://issues.apache.org/jira/browse/FLINK-31311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-31311. Resolution: Fixed master: e715ac81a906e8f1b285d3847cc75879bb54d062 > Supports Bounded Watermark streaming read > - > > Key: FLINK-31311 > URL: https://issues.apache.org/jira/browse/FLINK-31311 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > > There are some bound stream scenarios that require that stream reading can be > ended. Generally speaking, the end event time is the better. > So in this ticket, supports writing the watermark to the snapshot and can > specify the ending watermark when reading the stream. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #576: [FLINK-31311] Supports Bounded Watermark streaming read
JingsongLi merged PR #576: URL: https://github.com/apache/flink-table-store/pull/576 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22127: [FLINK-31351][sql-gateway] Don't stop the stuck thread by force
flinkbot commented on PR #22127: URL: https://github.com/apache/flink/pull/22127#issuecomment-1459422574 ## CI report: * 2331a3f92229c2cd6dfd0f2dc418f77fbc83e4c3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31365) Simplify FlinkActionsE2ETest#testMergeInto
yuzelin created FLINK-31365: --- Summary: Simplify FlinkActionsE2ETest#testMergeInto Key: FLINK-31365 URL: https://issues.apache.org/jira/browse/FLINK-31365 Project: Flink Issue Type: Sub-task Reporter: yuzelin Complicated test may cause failure in docker environment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31351) HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 times out on CI
[ https://issues.apache.org/jira/browse/FLINK-31351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31351: --- Labels: pull-request-available test-stability (was: test-stability) > HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 > times out on CI > - > > Key: FLINK-31351 > URL: https://issues.apache.org/jira/browse/FLINK-31351 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.17.0, 1.16.1, 1.18.0 >Reporter: David Morávek >Assignee: Shengkai Fang >Priority: Blocker > Labels: pull-request-available, test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46872=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24908] > > {code:java} > Mar 06 18:28:56 "ForkJoinPool-1-worker-25" #27 daemon prio=5 os_prio=0 > tid=0x7ff4b1832000 nid=0x21b2 waiting on condition [0x7ff3a8c3e000] > Mar 06 18:28:56java.lang.Thread.State: TIMED_WAITING (sleeping) > Mar 06 18:28:56 at java.lang.Thread.sleep(Native Method) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.waitUntilJobIsRunning(HiveServer2EndpointITCase.java:1004) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.lambda$testExecuteStatementInSyncModeWithRuntimeException2$37(HiveServer2EndpointITCase.java:711) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase$$Lambda$2018/2127600974.accept(Unknown > Source) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.runExecuteStatementInSyncModeWithRuntimeException(HiveServer2EndpointITCase.java:999) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2(HiveServer2EndpointITCase.java:709) > Mar 06 18:28:56 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Mar 06 18:28:56 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Mar 06 18:28:56 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Mar 06 18:28:56 at java.lang.reflect.Method.invoke(Method.java:498) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] fsk119 opened a new pull request, #22127: [FLINK-31351][sql-gateway] Don't stop the stuck thread by force
fsk119 opened a new pull request, #22127: URL: https://github.com/apache/flink/pull/22127 ## What is the purpose of the change *We should not stop the thread by force because it may cause an inconsistent state. So we just notify the users it has the possibility to cause resource leak. * -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-31351) HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 times out on CI
[ https://issues.apache.org/jira/browse/FLINK-31351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang reassigned FLINK-31351: - Assignee: Shengkai Fang > HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 > times out on CI > - > > Key: FLINK-31351 > URL: https://issues.apache.org/jira/browse/FLINK-31351 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.17.0, 1.16.1, 1.18.0 >Reporter: David Morávek >Assignee: Shengkai Fang >Priority: Blocker > Labels: test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46872=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24908] > > {code:java} > Mar 06 18:28:56 "ForkJoinPool-1-worker-25" #27 daemon prio=5 os_prio=0 > tid=0x7ff4b1832000 nid=0x21b2 waiting on condition [0x7ff3a8c3e000] > Mar 06 18:28:56java.lang.Thread.State: TIMED_WAITING (sleeping) > Mar 06 18:28:56 at java.lang.Thread.sleep(Native Method) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.waitUntilJobIsRunning(HiveServer2EndpointITCase.java:1004) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.lambda$testExecuteStatementInSyncModeWithRuntimeException2$37(HiveServer2EndpointITCase.java:711) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase$$Lambda$2018/2127600974.accept(Unknown > Source) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.runExecuteStatementInSyncModeWithRuntimeException(HiveServer2EndpointITCase.java:999) > Mar 06 18:28:56 at > org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2(HiveServer2EndpointITCase.java:709) > Mar 06 18:28:56 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Mar 06 18:28:56 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Mar 06 18:28:56 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Mar 06 18:28:56 at java.lang.reflect.Method.invoke(Method.java:498) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa closed pull request #22122: [BP-1.17][FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0.
reswqa closed pull request #22122: [BP-1.17][FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0. URL: https://github.com/apache/flink/pull/22122 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on pull request #22122: [BP-1.17][FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0.
reswqa commented on PR #22122: URL: https://github.com/apache/flink/pull/22122#issuecomment-1459371675 closed via 54c67e5e08c11ef9a538abbf14618f9e27be18f7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1128947802 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ## @@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() { localBufferPool.lazyDestroy(); // All buffers have been requested, but can not be returned yet. -assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool()); + assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers); // Recycle should return buffers to memory segment pool for (Buffer buffer : requests) { buffer.recycleBuffer(); } } +@Test +void testDecreasePoolSize() throws Exception { +final int maxMemorySegments = 10; +final int requiredMemorySegments = 4; +final int maxOverdraftBuffers = 2; +final int largePoolSize = 5; +final int smallPoolSize = 4; +LocalBufferPool bufferPool = +new LocalBufferPool( +networkBufferPool, +requiredMemorySegments, +maxMemorySegments, +0, +Integer.MAX_VALUE, +maxOverdraftBuffers); +Queue buffers = new LinkedList<>(); + +// set a larger pool size. +bufferPool.setNumBuffers(largePoolSize); +assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize); + +// request all buffer. +for (int i = 0; i < largePoolSize; i++) { +buffers.add(bufferPool.requestMemorySegmentBlocking()); +} +assertThat(bufferPool.isAvailable()).isFalse(); + +// request 1 overdraft buffers. +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// set a small pool size. +bufferPool.setNumBuffers(smallPoolSize); +assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); +assertThat(bufferPool.isAvailable()).isFalse(); + +// return all overdraft buffers. +bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); +bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// return the excess buffer. +bufferPool.recycle(buffers.poll()); +assertThat(bufferPool.isAvailable()).isFalse(); +// return non-excess buffers. +bufferPool.recycle(buffers.poll()); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isTrue(); + +while (!buffers.isEmpty()) { +bufferPool.recycle(buffers.poll()); +} +bufferPool.lazyDestroy(); +} + +@Test +void testIncreasePoolSize() throws Exception { Review Comment: Agree with you, fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] zhangjun0x01 commented on pull request #584: [FLINK-31338] support infer parallelism for flink table store
zhangjun0x01 commented on PR #584: URL: https://github.com/apache/flink-table-store/pull/584#issuecomment-1459354880 > @zhangjun0x01 Can we consider to use bucket number as streaming default parallelism? And use parallelism inference only for batch source. ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] zhangjun0x01 commented on pull request #584: [FLINK-31338] support infer parallelism for flink table store
zhangjun0x01 commented on PR #584: URL: https://github.com/apache/flink-table-store/pull/584#issuecomment-1459353387 > I guess why there are test failures is because too much parallelism has been derived, resulting in scheduling failures. Should we disable parallelism inference in default? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on a diff in pull request #21835: [FLINK-30854][state] Support periodic compaction for RocksdbCompactFilterCleanupStrategy
masteryhx commented on code in PR #21835: URL: https://github.com/apache/flink/pull/21835#discussion_r1128942248 ## flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java: ## @@ -437,18 +465,43 @@ public boolean runCleanupForEveryRecord() { DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY = new RocksdbCompactFilterCleanupStrategy(1000L); +/** + * Default value lets RocksDB control this feature as needed. For now, RocksDB will change + * this value to 30 days (i.e 30 * 24 * 60 * 60) so that every file goes through the + * compaction process at least once every 30 days if not compacted sooner. + */ +static final long DEFAULT_PERIODIC_COMPACTION_SECONDS = 0xfffeL; Review Comment: Just as you commented in the frocksdb: `Default: 0xfffe (allow RocksDB to auto-tune) For now, RocksDB will change this value to 30 days (i.e 30 * 24 * 60 * 60) so that every file goes through the compaction process at least once every 30 days if not compacted sooner.` IIUC, it's the default value of this feature in RocksDB ? Or Do we need to introduce a specific default value for flink meaning not setting this option ? WDYT ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30983) the security.ssl.algorithms configuration does not take effect in rest ssl
[ https://issues.apache.org/jira/browse/FLINK-30983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-30983: - Fix Version/s: 1.17.0 1.18.0 > the security.ssl.algorithms configuration does not take effect in rest ssl > -- > > Key: FLINK-30983 > URL: https://issues.apache.org/jira/browse/FLINK-30983 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.16.0 >Reporter: luyuan >Assignee: Yuxin Tan >Priority: Minor > Labels: pull-request-available > Fix For: 1.17.0, 1.18.0 > > Attachments: image-2023-02-09-15-58-36-254.png, > image-2023-02-09-15-58-43-963.png > > > The security.ssl.algorithms configuration does not take effect in rest ssl. > > SSLUtils#createRestNettySSLContext does not call SslContextBuilder#ciphers as > SSLUtils#createInternalNettySSLContext. > !image-2023-02-09-15-58-36-254.png! > > !image-2023-02-09-15-58-43-963.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] masteryhx commented on a diff in pull request #21835: [FLINK-30854][state] Support periodic compaction for RocksdbCompactFilterCleanupStrategy
masteryhx commented on code in PR #21835: URL: https://github.com/apache/flink/pull/21835#discussion_r1128940896 ## flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java: ## @@ -298,12 +298,40 @@ public Builder cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) { return this; } +/** + * Cleanup expired state while Rocksdb compaction is running. + * + * RocksDB compaction filter will query current timestamp, used to check expiration, from + * Flink every time after processing {@code queryTimeAfterNumEntries} number of state + * entries. Updating the timestamp more often can improve cleanup speed but it decreases + * compaction performance because it uses JNI call from native code. + * + * Periodic compaction could speed up expired state entries cleanup, especially for state + * entries rarely accessed. Files older than this value will be picked up for compaction, + * and re-written to the same level as they were before. It makes sure a file goes through + * compaction filters periodically. + * + * @param queryTimeAfterNumEntries number of state entries to process by compaction filter + * before updating current timestamp + * @param periodicCompactionSeconds periodic compaction per seconds which could speed up + * expired state cleanup. 0 means turning off periodic compaction. + */ +@Nonnull +public Builder cleanupInRocksdbCompactFilter( +long queryTimeAfterNumEntries, long periodicCompactionSeconds) { Review Comment: Sorry for the delayed update due to my personal business last month. Thanks for the suggestion. I replaced `long` with `Time` which is also the type of TTL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on pull request #584: [FLINK-31338] support infer parallelism for flink table store
JingsongLi commented on PR #584: URL: https://github.com/apache/flink-table-store/pull/584#issuecomment-1459327789 @zhangjun0x01 Can we consider to use bucket number as default parallelism? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] JunRuiLee commented on a diff in pull request #22098: [FLINK-31144][coordination] Modify the judgment logic of whether to ignore the input locations of a ConsumePartitionGroup if the
JunRuiLee commented on code in PR #22098: URL: https://github.com/apache/flink/pull/22098#discussion_r1128940538 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java: ## @@ -95,8 +95,7 @@ private CompletableFuture> getPreferredLocations // consumers compared to the consumed partition group size. This is to avoid tasks // unevenly distributed on nodes when running batch jobs or running jobs in // session/standalone mode. -if ((double) consumedPartitionGroup.getConsumerVertexGroup().size() -/ consumedPartitionGroup.size() +if (consumedPartitionGroup.getConsumerVertexGroup().size() Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] huwh commented on a diff in pull request #22098: [FLINK-31144][coordination] Modify the judgment logic of whether to ignore the input locations of a ConsumePartitionGroup if the corre
huwh commented on code in PR #22098: URL: https://github.com/apache/flink/pull/22098#discussion_r1128938614 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java: ## @@ -95,8 +95,7 @@ private CompletableFuture> getPreferredLocations // consumers compared to the consumed partition group size. This is to avoid tasks // unevenly distributed on nodes when running batch jobs or running jobs in // session/standalone mode. -if ((double) consumedPartitionGroup.getConsumerVertexGroup().size() -/ consumedPartitionGroup.size() +if (consumedPartitionGroup.getConsumerVertexGroup().size() Review Comment: typo: i -> in -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1128937275 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } +@GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); -return !availableMemorySegments.isEmpty() -&& unavailableSubpartitionsCount == 0 -&& numberOfRequestedOverdraftMemorySegments == 0; +return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: Before we allows convert `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments `, there still be a situation where both `availableMemorySegment` and `overdraft buffer` are all not zero, and this state should obviously be defined as available. But after that, I'm not sure whether this situation still exists. I'm a little worried about the potential bug of multithreading if we add `numberOfRequestedOverdraftMemorySegments == 0` back to `shouldBeAvailable`. For me, the key point is that if we think "`The availableMemorySegments` is always empty when `numberOfRequestedOverdraftMemorySegments != 0`." is tenable, does `!availableMemorySegments.isEmpty()` already include `numberOfRequestedOverdraftMemorySegments == 0`? Even if this will not introduce bug, why should we impose useless constraints? Actually, before overdraft buffer was introduced, the definition of `available` was very clear: There is at least one `availableMemorySegment` and no subpartitions has reached `maxBuffersPerChannel`. IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } +@GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); -return !availableMemorySegments.isEmpty() -&& unavailableSubpartitionsCount == 0 -&& numberOfRequestedOverdraftMemorySegments == 0; +return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: Before we allows convert `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments `, there still be a situation where both `availableMemorySegment` and `overdraft buffer` are all not zero, and this state should obviously be defined as available. But after that, I'm not sure whether this situation still exists. I'm a little worried about the potential bug of multithreading if we add `numberOfRequestedOverdraftMemorySegments == 0` back to `shouldBeAvailable`. For me, the key point is that if we think "The `availableMemorySegments` is always empty when `numberOfRequestedOverdraftMemorySegments != 0`." is tenable, does `!availableMemorySegments.isEmpty()` already include `numberOfRequestedOverdraftMemorySegments == 0`? Even if this will not introduce bug, why should we impose useless constraints? Actually, before overdraft buffer was introduced, the definition of `available` was very clear: There is at least one `availableMemorySegment` and no subpartitions has reached `maxBuffersPerChannel`. IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1128937275 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } +@GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); -return !availableMemorySegments.isEmpty() -&& unavailableSubpartitionsCount == 0 -&& numberOfRequestedOverdraftMemorySegments == 0; +return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: Before we allows convert `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments `, there still be a situation where both `availableMemorySegment` and `overdraft buffer` are all not zero, and this state should obviously be defined as available. But after that, I'm not sure whether this situation still exists. I'm a little worried about the potential bug of multithreading if we add `numberOfRequestedOverdraftMemorySegments == 0` back to `shouldBeAvailable`. For me, the key point is that if we think "`The availableMemorySegments` is always empty when `numberOfRequestedOverdraftMemorySegments` != 0." is tenable, does `!availableMemorySegments.isEmpty()` already include `numberOfRequestedOverdraftMemorySegments == 0`? Even if this will not introduce bug, why should we impose useless constraints? Actually, before overdraft buffer was introduced, the definition of `available` was very clear: There is at least one `availableMemorySegment` and no subpartitions has reached `maxBuffersPerChannel`. IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1128937275 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } +@GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); -return !availableMemorySegments.isEmpty() -&& unavailableSubpartitionsCount == 0 -&& numberOfRequestedOverdraftMemorySegments == 0; +return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: Before we allows convert `numberOfRequestedOverdraftMemorySegments` to `numberOfRequestedMemorySegments `, there still be a situation where both `availableMemorySegment` and `overdraft buffer` are all not zero, and this state should obviously be defined as available. But after that, I'm not sure whether this situation still exists. I'm a little worried about the potential bug of multithreading if we add `numberOfRequestedOverdraftMemorySegments == 0` back to `shouldBeAvailable`. For me, the key point is that if we think "`The availableMemorySegments` is always empty when `numberOfRequestedOverdraftMemorySegments` != 0." is tenable, does `!availableMemorySegments.isEmpty()` already include `numberOfRequestedOverdraftMemorySegments == 0`? Even if this will not introduce bug, why should we impose useless constraints? Actually, before overdraft buffer was introduced, the definition of `available` was very clear: There is at least one `availableMemorySegment` and no subpartitions has reached `maxBuffersPerChannel`. IMO, Introducing the overdraft mechanism should not break this protocol, overdraft buffer should not affect the judgment of availability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22126: [Bug][FLINK-31363]KafkaSink failed to commit transactions under EXACTLY_ONCE semantics.
flinkbot commented on PR #22126: URL: https://github.com/apache/flink/pull/22126#issuecomment-1459297877 ## CI report: * 79c14fdf6cfa1d56740e66d78e7b737b803e7591 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31363) KafkaSink failed to commit transactions under EXACTLY_ONCE semantics
[ https://issues.apache.org/jira/browse/FLINK-31363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31363: --- Labels: pull-request-available (was: ) > KafkaSink failed to commit transactions under EXACTLY_ONCE semantics > > > Key: FLINK-31363 > URL: https://issues.apache.org/jira/browse/FLINK-31363 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.0, 1.16.1, 1.18.0 >Reporter: lightzhao >Priority: Major > Labels: pull-request-available > Attachments: image-2023-03-08-10-54-51-410.png > > > When KafkaSink starts Exactly once and no data is written to the topic during > a checkpoint, the transaction commit exception is triggered, with the > following exception. > [Transiting to fatal error state due to > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state.] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lightzhao opened a new pull request, #22126: [Bug][FLINK-31363]KafkaSink failed to commit transactions under EXACTLY_ONCE semantics.
lightzhao opened a new pull request, #22126: URL: https://github.com/apache/flink/pull/22126 ## What is the purpose of the change [FLINK-31363](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-31363) When KafkaSink starts Exactly once and no data is written to the topic during a checkpoint, the transaction commit exception is triggered, with the following exception. https://user-images.githubusercontent.com/40714172/223610347-41dac9a6-ec87-40c8-9e77-d1eb93f39afd.png;> If data is written during the checkpoint, this exception will not occur. After the transaction attribute transactionStarted is changed to false, whether there is data writing or not will not affect the transaction commit. ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31364) [Flink] add metrics for TableStore
Ming Li created FLINK-31364: --- Summary: [Flink] add metrics for TableStore Key: FLINK-31364 URL: https://issues.apache.org/jira/browse/FLINK-31364 Project: Flink Issue Type: Improvement Components: Table Store Reporter: Ming Li Currently, relevant metrics are missing in {{{}Table Store{}}}, such as split consumption speed, commit information statistics, etc. We can add metrics for real-time monitoring of the {{{}Table Store{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1128926904 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -669,6 +669,13 @@ public void setNumBuffers(int numBuffers) { currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments); +// reset overdraft buffers +while (numberOfRequestedOverdraftMemorySegments > 0 +&& numberOfRequestedMemorySegments < currentPoolSize) { Review Comment: The reason why I didn't do this convert here is that we have an upper limit on the number of overdraft buffers. If this conversion is allowed, and the `pool size` changes very small, the `numberOfRequestedOverdraftMemorySegments ` will exceed its upper limit. Of course, we can convert it to the upper limit at most, but this will make the logic a little complicated. It seems that there is no benefit to introduce this mechanism. WDTY? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one
masteryhx commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1459246416 > Hey @masteryhx, do you plan to work on that for 1.18? Sure, I'd like to. Thanks a lot for the reminder. Big sorry for the delayed update due to my personal business last month. I will rework on the pr this month. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-31359) HsResultPartitionTest fails with fatal error
[ https://issues.apache.org/jira/browse/FLINK-31359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697703#comment-17697703 ] Weijie Guo edited comment on FLINK-31359 at 3/8/23 3:00 AM: master(1.18) via eff9b16799f162f31058be5acac567943a446dea. This issue only affect master(1.18). was (Author: weijie guo): master(1.18) via eff9b16799f162f31058be5acac567943a446dea. > HsResultPartitionTest fails with fatal error > > > Key: FLINK-31359 > URL: https://issues.apache.org/jira/browse/FLINK-31359 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Matthias Pohl >Assignee: Weijie Guo >Priority: Critical > Labels: test-stability > Fix For: 1.18.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46910=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8512 > {code} > Mar 07 13:20:39 [ERROR] Process Exit Code: 239 > Mar 07 13:20:39 [ERROR] Crashed tests: > Mar 07 13:20:39 [ERROR] > org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartitionTest > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31359) HsResultPartitionTest fails with fatal error
[ https://issues.apache.org/jira/browse/FLINK-31359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-31359. -- Resolution: Fixed master(1.18) via eff9b16799f162f31058be5acac567943a446dea. > HsResultPartitionTest fails with fatal error > > > Key: FLINK-31359 > URL: https://issues.apache.org/jira/browse/FLINK-31359 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Matthias Pohl >Assignee: Weijie Guo >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46910=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8512 > {code} > Mar 07 13:20:39 [ERROR] Process Exit Code: 239 > Mar 07 13:20:39 [ERROR] Crashed tests: > Mar 07 13:20:39 [ERROR] > org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartitionTest > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31359) HsResultPartitionTest fails with fatal error
[ https://issues.apache.org/jira/browse/FLINK-31359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-31359: --- Fix Version/s: 1.18.0 > HsResultPartitionTest fails with fatal error > > > Key: FLINK-31359 > URL: https://issues.apache.org/jira/browse/FLINK-31359 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Matthias Pohl >Assignee: Weijie Guo >Priority: Critical > Labels: test-stability > Fix For: 1.18.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46910=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8512 > {code} > Mar 07 13:20:39 [ERROR] Process Exit Code: 239 > Mar 07 13:20:39 [ERROR] Crashed tests: > Mar 07 13:20:39 [ERROR] > org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartitionTest > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] JunRuiLee commented on pull request #22098: [FLINK-31144][coordination] Modify the judgment logic of whether to ignore the input locations of a ConsumePartitionGroup if the correspond
JunRuiLee commented on PR #22098: URL: https://github.com/apache/flink/pull/22098#issuecomment-1459238292 @huwh @zhuzhurk Thanks for review this pr and I've addressed comments, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa merged pull request #22125: [FLINK-31360][test] Using an executor service which ignore rejectException as shutdown for HsResultPartitionTest.
reswqa merged PR #22125: URL: https://github.com/apache/flink/pull/22125 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-31363) KafkaSink failed to commit transactions under EXACTLY_ONCE semantics
lightzhao created FLINK-31363: - Summary: KafkaSink failed to commit transactions under EXACTLY_ONCE semantics Key: FLINK-31363 URL: https://issues.apache.org/jira/browse/FLINK-31363 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.16.1, 1.17.0, 1.18.0 Reporter: lightzhao Attachments: image-2023-03-08-10-54-51-410.png When KafkaSink starts Exactly once and no data is written to the topic during a checkpoint, the transaction commit exception is triggered, with the following exception. [Transiting to fatal error state due to org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31298) ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException
[ https://issues.apache.org/jira/browse/FLINK-31298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697698#comment-17697698 ] Wencong Liu commented on FLINK-31298: - cc [~mapohl] > ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows > IllegalArgumentException > - > > Key: FLINK-31298 > URL: https://issues.apache.org/jira/browse/FLINK-31298 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.17.0, 1.15.3, 1.16.1 >Reporter: Matthias Pohl >Assignee: Wencong Liu >Priority: Major > Labels: pull-request-available, starter, test-stability > > FLINK-24156 introduced {{NetUtils.acceptWithoutTimeout}} which caused the > test to print a the stacktrace of an {{IllegalArgumentException}}: > {code} > Exception in thread "Thread-0" java.lang.IllegalArgumentException: > serverSocket SO_TIMEOUT option must be 0 > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) > at > org.apache.flink.util.NetUtils.acceptWithoutTimeout(NetUtils.java:139) > at > org.apache.flink.runtime.net.ConnectionUtilsTest$1.run(ConnectionUtilsTest.java:83) > at java.lang.Thread.run(Thread.java:750) > {code} > This is also shown in the Maven output of CI runs and might cause confusion. > The test should be fixed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] WencongLiu commented on pull request #22121: [FLINK-27051] fix CompletedCheckpoint.DiscardObject.discard is not idempotent
WencongLiu commented on PR #22121: URL: https://github.com/apache/flink/pull/22121#issuecomment-1459222793 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached
1996fanrui commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1128900085 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() { mayNotifyAvailable(toNotify); } +@GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); -return !availableMemorySegments.isEmpty() -&& unavailableSubpartitionsCount == 0 -&& numberOfRequestedOverdraftMemorySegments == 0; +return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0; Review Comment: Why the `numberOfRequestedOverdraftMemorySegments == 0` can be removed? I think there is a constraint here: The `availableMemorySegments` is always empty when `numberOfRequestedOverdraftMemorySegments > 0`. Sometimes this constraint does not hold before this PR, and we want to hold the constraint in the future, right? If yes, could you add some comments for `numberOfRequestedOverdraftMemorySegments`? It's helpful for other developers to understand the constraint. ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ## @@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() { localBufferPool.lazyDestroy(); // All buffers have been requested, but can not be returned yet. -assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool()); + assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers); // Recycle should return buffers to memory segment pool for (Buffer buffer : requests) { buffer.recycleBuffer(); } } +@Test +void testDecreasePoolSize() throws Exception { +final int maxMemorySegments = 10; +final int requiredMemorySegments = 4; +final int maxOverdraftBuffers = 2; +final int largePoolSize = 5; +final int smallPoolSize = 4; +LocalBufferPool bufferPool = +new LocalBufferPool( +networkBufferPool, +requiredMemorySegments, +maxMemorySegments, +0, +Integer.MAX_VALUE, +maxOverdraftBuffers); +Queue buffers = new LinkedList<>(); + +// set a larger pool size. +bufferPool.setNumBuffers(largePoolSize); +assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize); + +// request all buffer. +for (int i = 0; i < largePoolSize; i++) { +buffers.add(bufferPool.requestMemorySegmentBlocking()); +} +assertThat(bufferPool.isAvailable()).isFalse(); + +// request 1 overdraft buffers. +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// set a small pool size. +bufferPool.setNumBuffers(smallPoolSize); +assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); +buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); +assertThat(bufferPool.isAvailable()).isFalse(); + +// return all overdraft buffers. +bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isFalse(); +bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero(); +assertThat(bufferPool.isAvailable()).isFalse(); + +// return the excess buffer. +bufferPool.recycle(buffers.poll()); +assertThat(bufferPool.isAvailable()).isFalse(); +// return non-excess buffers. +bufferPool.recycle(buffers.poll()); +assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); +assertThat(bufferPool.isAvailable()).isTrue(); + +while (!buffers.isEmpty()) { +bufferPool.recycle(buffers.poll()); +} +bufferPool.lazyDestroy(); +} + +@Test +void testIncreasePoolSize() throws Exception { Review Comment: I prefer change the `testIncreasePoolSize` to the `testIncreasePoolSizeExceedTotalBuffers`. And the `testIncreasePoolSizeExceedTotalBuffers` and `testIncreasePoolSizeNotExceedTotalBuffers` as the normal
[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor
Fanoid commented on code in PR #210: URL: https://github.com/apache/flink-ml/pull/210#discussion_r1128905027 ## flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorage.java: ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.sharedstorage; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** A shared storage to support access through subtasks of different operators. */ +class SharedStorage { +private static final Map, Object> m = Review Comment: Sure! I'll rename variables in this class. For the latter issue, I prefer to consider `SharedStorage` as a container of **references** to data items, and provide sharing utility among subtasks of operators. It is still the owner of data items to worry memory issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor
Fanoid commented on code in PR #210: URL: https://github.com/apache/flink-ml/pull/210#discussion_r1128889422 ## flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorage.java: ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.sharedstorage; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** A shared storage to support access through subtasks of different operators. */ +class SharedStorage { +private static final Map, Object> m = +new ConcurrentHashMap<>(); + +private static final Map, String> owners = +new ConcurrentHashMap<>(); + +/** Gets a {@link Reader} of shared item identified by (storageID, subtaskId, descriptor). */ +static Reader getReader( +StorageID storageID, int subtaskId, ItemDescriptor descriptor) { +return new Reader<>(Tuple3.of(storageID, subtaskId, descriptor.key)); +} + +/** Gets a {@link Writer} of shared item identified by (storageID, subtaskId, key). */ +static Writer getWriter( +StorageID storageID, +int subtaskId, +ItemDescriptor descriptor, +String ownerId, +OperatorID operatorID, +StreamTask containingTask, +StreamingRuntimeContext runtimeContext, +StateInitializationContext stateInitializationContext) { +Tuple3 t = Tuple3.of(storageID, subtaskId, descriptor.key); +String lastOwner = owners.putIfAbsent(t, ownerId); +if (null != lastOwner) { +throw new IllegalStateException( +String.format( +"The shared item (%s, %s, %s) already has a writer %s.", +storageID, subtaskId, descriptor.key, ownerId)); +} +Writer writer = +new Writer<>( +t, +ownerId, +descriptor.serializer, +containingTask, +runtimeContext, +stateInitializationContext, +operatorID); +writer.set(descriptor.initVal); +return writer; +} + +static class Reader { +protected final Tuple3 t; + +Reader(Tuple3 t) { +this.t = t; +} + +T get() { +// It is possible that the `get` request of an item is triggered earlier than its +// initialization. In this case, we wait for a while. +long waitTime = 10; +do { +//noinspection unchecked +T value = (T) m.get(t); +if (null != value) { +return value; +} +try { +Thread.sleep(waitTime); +} catch (InterruptedException e) { +break; +} +waitTime *= 2; +} while (waitTime < 10 * 1000); +throw new IllegalStateException( +String.format("Failed to get value of %s after waiting %d ms.", t, waitTime)); +} +} + +static class Writer extends Reader { +private final String ownerId; +private final ListStateWithCache cache; +private boolean isDirty; + +Writer( +Tuple3 t, +String ownerId, +TypeSerializer serializer, +StreamTask containingTask, +StreamingRuntimeContext
[GitHub] [flink] TanYuxin-tyx commented on pull request #22125: [FLINK-31360][test] Using an executor service which ignore rejectException as shutdown for HsResultPartitionTest.
TanYuxin-tyx commented on PR #22125: URL: https://github.com/apache/flink/pull/22125#issuecomment-1459208813 This change LGTM. I found similar solutions to the issue, e.g., in SchedulerFactory, KubernetesLeaderElector, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-31346) Batch shuffle IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0
[ https://issues.apache.org/jira/browse/FLINK-31346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697402#comment-17697402 ] Weijie Guo edited comment on FLINK-31346 at 3/8/23 2:34 AM: master(1.18) via 5ad2ae2c24ade2655981f609298978d26329466f. release-1.17 via 54c67e5e08c11ef9a538abbf14618f9e27be18f7. was (Author: weijie guo): master(1.18) via 5ad2ae2c24ade2655981f609298978d26329466f. > Batch shuffle IO scheduler does not throw TimeoutException if > numRequestedBuffers is greater than 0 > --- > > Key: FLINK-31346 > URL: https://issues.apache.org/jira/browse/FLINK-31346 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.16.1 >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > We currently rely on throw exception to trigger downstream task failover to > avoid read buffer request deadlock. But if {{numRequestedBuffers}} is greater > than 0, IO scheduler does not throw {{TimeoutException}}. This will cause a > deadlock. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor
Fanoid commented on code in PR #210: URL: https://github.com/apache/flink-ml/pull/210#discussion_r1128889422 ## flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorage.java: ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.sharedstorage; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** A shared storage to support access through subtasks of different operators. */ +class SharedStorage { +private static final Map, Object> m = +new ConcurrentHashMap<>(); + +private static final Map, String> owners = +new ConcurrentHashMap<>(); + +/** Gets a {@link Reader} of shared item identified by (storageID, subtaskId, descriptor). */ +static Reader getReader( +StorageID storageID, int subtaskId, ItemDescriptor descriptor) { +return new Reader<>(Tuple3.of(storageID, subtaskId, descriptor.key)); +} + +/** Gets a {@link Writer} of shared item identified by (storageID, subtaskId, key). */ +static Writer getWriter( +StorageID storageID, +int subtaskId, +ItemDescriptor descriptor, +String ownerId, +OperatorID operatorID, +StreamTask containingTask, +StreamingRuntimeContext runtimeContext, +StateInitializationContext stateInitializationContext) { +Tuple3 t = Tuple3.of(storageID, subtaskId, descriptor.key); +String lastOwner = owners.putIfAbsent(t, ownerId); +if (null != lastOwner) { +throw new IllegalStateException( +String.format( +"The shared item (%s, %s, %s) already has a writer %s.", +storageID, subtaskId, descriptor.key, ownerId)); +} +Writer writer = +new Writer<>( +t, +ownerId, +descriptor.serializer, +containingTask, +runtimeContext, +stateInitializationContext, +operatorID); +writer.set(descriptor.initVal); +return writer; +} + +static class Reader { +protected final Tuple3 t; + +Reader(Tuple3 t) { +this.t = t; +} + +T get() { +// It is possible that the `get` request of an item is triggered earlier than its +// initialization. In this case, we wait for a while. +long waitTime = 10; +do { +//noinspection unchecked +T value = (T) m.get(t); +if (null != value) { +return value; +} +try { +Thread.sleep(waitTime); +} catch (InterruptedException e) { +break; +} +waitTime *= 2; +} while (waitTime < 10 * 1000); +throw new IllegalStateException( +String.format("Failed to get value of %s after waiting %d ms.", t, waitTime)); +} +} + +static class Writer extends Reader { +private final String ownerId; +private final ListStateWithCache cache; +private boolean isDirty; + +Writer( +Tuple3 t, +String ownerId, +TypeSerializer serializer, +StreamTask containingTask, +StreamingRuntimeContext
[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor
Fanoid commented on code in PR #210: URL: https://github.com/apache/flink-ml/pull/210#discussion_r1128873310 ## flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorageContextImpl.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.sharedstorage; + +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.BiConsumerWithException; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** Default implementation of {@link SharedStorageContext} using {@link SharedStorage}. */ +@SuppressWarnings("rawtypes") +class SharedStorageContextImpl implements SharedStorageContext, Serializable { +private final StorageID storageID; +private final Map writers = new HashMap<>(); +private final Map readers = new HashMap<>(); +private Map, String> ownerMap; + +public SharedStorageContextImpl() { +this.storageID = new StorageID(); +} + +public void setOwnerMap(Map, String> ownerMap) { +this.ownerMap = ownerMap; +} + +@Override +public void invoke(BiConsumerWithException func) +throws Exception { +func.accept(this::getSharedItem, this::setSharedItem); +} + +private T getSharedItem(ItemDescriptor key) { +//noinspection unchecked +SharedStorage.Reader reader = readers.get(key); +Preconditions.checkState( +null != reader, +String.format( +"The operator requested to read a shared item %s not owned by itself.", +key)); +return reader.get(); +} + +private void setSharedItem(ItemDescriptor key, T value) { +//noinspection unchecked +SharedStorage.Writer writer = writers.get(key); +Preconditions.checkState( +null != writer, +String.format( +"The operator requested to read a shared item %s not owned by itself.", +key)); +writer.set(value); +} + +@Override +public & SharedStorageStreamOperator> void initializeState( +T operator, +StreamingRuntimeContext runtimeContext, +StateInitializationContext context) { +String ownerId = operator.getSharedStorageAccessorID(); +int subtaskId = runtimeContext.getIndexOfThisSubtask(); +for (Map.Entry, String> entry : ownerMap.entrySet()) { +ItemDescriptor descriptor = entry.getKey(); +if (ownerId.equals(entry.getValue())) { +writers.put( Review Comment: There is no instances to be overwritten here, because every subtask owns a distinct instance of `SharedStorageContextImpl`. The field of `context` in operators are serialized and then deserialized to different instances when operators being deployed. It just like other fields. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-31343) Remove JMH dependency in flink-table-store-micro-benchmark
[ https://issues.apache.org/jira/browse/FLINK-31343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-31343. Resolution: Fixed master: d1c224890dad80c1d7664dc3011534b3ae1679f9 > Remove JMH dependency in flink-table-store-micro-benchmark > -- > > Key: FLINK-31343 > URL: https://issues.apache.org/jira/browse/FLINK-31343 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #585: [FLINK-31343] Remove JMH dependency in flink-table-store-micro-benchmark
JingsongLi merged PR #585: URL: https://github.com/apache/flink-table-store/pull/585 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] FangYongs commented on pull request #585: [FLINK-31343] Remove JMH dependency in flink-table-store-micro-benchmark
FangYongs commented on PR #585: URL: https://github.com/apache/flink-table-store/pull/585#issuecomment-1459150128 Thanks @JingsongLi LGTM +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] LadyForest commented on pull request #21452: [FLINK-30282] Fix Logical type ROW lost inner field's nullability after converting to RelDataType
LadyForest commented on PR #21452: URL: https://github.com/apache/flink/pull/21452#issuecomment-1459148347 > @LadyForest this change looks reasonable however now there are some conflicts... Do you mind if I ask you to rebase? Sure. I'll do the rebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor
Fanoid commented on code in PR #210: URL: https://github.com/apache/flink-ml/pull/210#discussion_r1128853646 ## flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/operators/CalcLocalSplitsOperator.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.gbt.operators; + +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.iteration.IterationListener; +import org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache; +import org.apache.flink.iteration.operator.OperatorStateUtils; +import org.apache.flink.ml.common.gbt.defs.Histogram; +import org.apache.flink.ml.common.gbt.defs.LearningNode; +import org.apache.flink.ml.common.gbt.defs.Splits; +import org.apache.flink.ml.common.sharedstorage.SharedStorageContext; +import org.apache.flink.ml.common.sharedstorage.SharedStorageStreamOperator; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +/** Calculates local splits for assigned (nodeId, featureId) pairs. */ +public class CalcLocalSplitsOperator extends AbstractStreamOperator +implements OneInputStreamOperator, +IterationListener, +SharedStorageStreamOperator { + +private static final String SPLIT_FINDER_STATE_NAME = "split_finder"; +private final String sharedStorageAccessorID; +// States of local data. +private transient ListStateWithCache splitFinderState; +private transient SplitFinder splitFinder; +private transient SharedStorageContext sharedStorageContext; + +public CalcLocalSplitsOperator() { +sharedStorageAccessorID = getClass().getSimpleName() + "-" + UUID.randomUUID(); Review Comment: Actually, I've tried `StreamOperator#getOperatorID` before. However, the Operator ID cannot be obtained before execution. Then, we are unable to specify the owner map of share data items when building graph. Without the owner map, it is difficult to control access in runtime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor
Fanoid commented on code in PR #210: URL: https://github.com/apache/flink-ml/pull/210#discussion_r1128853646 ## flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/operators/CalcLocalSplitsOperator.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.gbt.operators; + +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.iteration.IterationListener; +import org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache; +import org.apache.flink.iteration.operator.OperatorStateUtils; +import org.apache.flink.ml.common.gbt.defs.Histogram; +import org.apache.flink.ml.common.gbt.defs.LearningNode; +import org.apache.flink.ml.common.gbt.defs.Splits; +import org.apache.flink.ml.common.sharedstorage.SharedStorageContext; +import org.apache.flink.ml.common.sharedstorage.SharedStorageStreamOperator; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +/** Calculates local splits for assigned (nodeId, featureId) pairs. */ +public class CalcLocalSplitsOperator extends AbstractStreamOperator +implements OneInputStreamOperator, +IterationListener, +SharedStorageStreamOperator { + +private static final String SPLIT_FINDER_STATE_NAME = "split_finder"; +private final String sharedStorageAccessorID; +// States of local data. +private transient ListStateWithCache splitFinderState; +private transient SplitFinder splitFinder; +private transient SharedStorageContext sharedStorageContext; + +public CalcLocalSplitsOperator() { +sharedStorageAccessorID = getClass().getSimpleName() + "-" + UUID.randomUUID(); Review Comment: Actually, I've tried `StreamOperator#getOperatorID` before. Since the Operator ID cannot be obtained before execution, we are unable to specify the owner map of share data items when building graph. Without the owner map, it is difficult to control access in runtime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 closed pull request #21638: [FLINK-29948][rest] Add rootCause in the ErrorResponseBody
fsk119 closed pull request #21638: [FLINK-29948][rest] Add rootCause in the ErrorResponseBody URL: https://github.com/apache/flink/pull/21638 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] weifonghsia commented on pull request #22085: PubSubSink should be using "withTopicName"
weifonghsia commented on PR #22085: URL: https://github.com/apache/flink/pull/22085#issuecomment-1459075862 @MartijnVisser - Would you be able to review this PR? Fairly small doc change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31362) Upgrade to Calcite version to 1.33.0
[ https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31362: --- Description: In Calcite 1.33.0, C-style escape strings have been supported. We could leverage it to enhance our string literals usage. issue: https://issues.apache.org/jira/browse/CALCITE-5305 was: In Calcite 1.33.0, C-style escape strings have been supported. We should outline its usage in document after upgrading to Calcite 1.33.0 > Upgrade to Calcite version to 1.33.0 > > > Key: FLINK-31362 > URL: https://issues.apache.org/jira/browse/FLINK-31362 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Aitozi >Priority: Major > > In Calcite 1.33.0, C-style escape strings have been supported. We could > leverage it to enhance our string literals usage. > issue: https://issues.apache.org/jira/browse/CALCITE-5305 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #546: [FLINK-31326] Consolidate source scaling logic
gyfora commented on code in PR #546: URL: https://github.com/apache/flink-kubernetes-operator/pull/546#discussion_r1128753730 ## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java: ## @@ -34,7 +33,11 @@ public enum MetricAggregator { this.getter = getter; } -public Optional get(AggregatedMetric metric) { -return Optional.ofNullable(metric).map(getter).filter(d -> !d.isNaN()); +public double get(AggregatedMetric metric) { +if (metric != null) { +return getter.apply(metric); +} else { +return Double.NaN; +} Review Comment: This change here could be the reason why the lag growth rate may be nan after this. Before this commit after your pending record related change the lag would be 0 , and now this may return Double.nan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31362) Upgrade to Calcite version to 1.33.0
[ https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31362: --- Component/s: Table SQL / API (was: Documentation) > Upgrade to Calcite version to 1.33.0 > > > Key: FLINK-31362 > URL: https://issues.apache.org/jira/browse/FLINK-31362 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Aitozi >Priority: Major > > In Calcite 1.33.0, C-style escape strings have been supported. We should > outline its usage in document after upgrading to Calcite 1.33.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31362) Upgrade to Calcite version to 1.33.0
[ https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31362: --- Summary: Upgrade to Calcite version to 1.33.0 (was: Add document about how to use C-style escape strings) > Upgrade to Calcite version to 1.33.0 > > > Key: FLINK-31362 > URL: https://issues.apache.org/jira/browse/FLINK-31362 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Aitozi >Priority: Major > > In Calcite 1.33.0, C-style escape strings have been supported. We should > outline its usage in document after upgrading to Calcite 1.33.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31362) Add document about how to use C-style escape strings
[ https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aitozi updated FLINK-31362: --- Description: In Calcite 1.33.0, C-style escape strings have been supported. We should outline its usage in document after upgrading to Calcite 1.33.0 > Add document about how to use C-style escape strings > > > Key: FLINK-31362 > URL: https://issues.apache.org/jira/browse/FLINK-31362 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Aitozi >Priority: Major > > In Calcite 1.33.0, C-style escape strings have been supported. We should > outline its usage in document after upgrading to Calcite 1.33.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31362) Add document about how to use C-style escape strings
Aitozi created FLINK-31362: -- Summary: Add document about how to use C-style escape strings Key: FLINK-31362 URL: https://issues.apache.org/jira/browse/FLINK-31362 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Aitozi -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31361) job created by sql-client can't authenticate to kafka, can't find org.apache.kafka.common.security.plain.PlainLoginModule
[ https://issues.apache.org/jira/browse/FLINK-31361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697645#comment-17697645 ] David Anderson commented on FLINK-31361: I'm using the sql-client to create the job. > job created by sql-client can't authenticate to kafka, can't find > org.apache.kafka.common.security.plain.PlainLoginModule > - > > Key: FLINK-31361 > URL: https://issues.apache.org/jira/browse/FLINK-31361 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: David Anderson >Priority: Major > > I'm working with this SQL DDL: > {noformat} > CREATE TABLE pageviews_sink ( > `url` STRING, > `user_id` STRING, > `browser` STRING, > `ts` TIMESTAMP_LTZ(3) > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'pageviews', > 'properties.bootstrap.servers' = 'xxx.confluent.cloud:9092', > 'properties.security.protocol'='SASL_SSL', > 'properties.sasl.mechanism'='PLAIN', > > 'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule > required username="xxx" password="xxx";', > 'key.format' = 'json', > 'key.fields' = 'url', > 'value.format' = 'json' > ); > {noformat} > With {{flink-sql-connector-kafka-1.16.1.jar}} in the lib directory, this > fails with > {noformat} > Caused by: javax.security.auth.login.LoginException: No LoginModule found for > org.apache.kafka.common.security.plain.PlainLoginModule{noformat} > As a workaround I've found that it does work if I provide both > > {{flink-connector-kafka-1.16.1.jar}} > {{kafka-clients-3.2.3.jar}} > > in the lib directory. It seems like the relocation applied in the SQL > connector isn't working properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31361) job created by sql-client can't authenticate to kafka, can't find org.apache.kafka.common.security.plain.PlainLoginModule
David Anderson created FLINK-31361: -- Summary: job created by sql-client can't authenticate to kafka, can't find org.apache.kafka.common.security.plain.PlainLoginModule Key: FLINK-31361 URL: https://issues.apache.org/jira/browse/FLINK-31361 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.16.1 Reporter: David Anderson I'm working with this SQL DDL: {noformat} CREATE TABLE pageviews_sink ( `url` STRING, `user_id` STRING, `browser` STRING, `ts` TIMESTAMP_LTZ(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'pageviews', 'properties.bootstrap.servers' = 'xxx.confluent.cloud:9092', 'properties.security.protocol'='SASL_SSL', 'properties.sasl.mechanism'='PLAIN', 'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";', 'key.format' = 'json', 'key.fields' = 'url', 'value.format' = 'json' ); {noformat} With {{flink-sql-connector-kafka-1.16.1.jar}} in the lib directory, this fails with {noformat} Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule{noformat} As a workaround I've found that it does work if I provide both {{flink-connector-kafka-1.16.1.jar}} {{kafka-clients-3.2.3.jar}} in the lib directory. It seems like the relocation applied in the SQL connector isn't working properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-30609) Add ephemeral storage to CRD
[ https://issues.apache.org/jira/browse/FLINK-30609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-30609: -- Assignee: Prabcs (was: Matyas Orhidi) > Add ephemeral storage to CRD > > > Key: FLINK-30609 > URL: https://issues.apache.org/jira/browse/FLINK-30609 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Matyas Orhidi >Assignee: Prabcs >Priority: Major > Labels: starter > Fix For: kubernetes-operator-1.5.0 > > > We should consider adding ephemeral storage to the existing [resource > specification > |https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#resource]in > CRD, next to {{cpu}} and {{memory}} > https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#setting-requests-and-limits-for-local-ephemeral-storage -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30609) Add ephemeral storage to CRD
[ https://issues.apache.org/jira/browse/FLINK-30609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697635#comment-17697635 ] Gyula Fora commented on FLINK-30609: [~morhidi] do you mind if [~pbharaj] takes over this ticket? > Add ephemeral storage to CRD > > > Key: FLINK-30609 > URL: https://issues.apache.org/jira/browse/FLINK-30609 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Matyas Orhidi >Assignee: Matyas Orhidi >Priority: Major > Labels: starter > Fix For: kubernetes-operator-1.5.0 > > > We should consider adding ephemeral storage to the existing [resource > specification > |https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#resource]in > CRD, next to {{cpu}} and {{memory}} > https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#setting-requests-and-limits-for-local-ephemeral-storage -- This message was sent by Atlassian Jira (v8.20.10#820010)