[GitHub] [flink-table-store] JingsongLi closed pull request #569: [FLINK-31269] Split hive connector to each module of each version

2023-03-07 Thread via GitHub


JingsongLi closed pull request #569: [FLINK-31269] Split hive connector to each 
module of each version
URL: https://github.com/apache/flink-table-store/pull/569


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on pull request #21736: [FLINK-27518][tests] Refactor migration tests to support version update automatically

2023-03-07 Thread via GitHub


gaoyunhaii commented on PR #21736:
URL: https://github.com/apache/flink/pull/21736#issuecomment-1459669377

   Hi @XComp the option of having the information in the resource direction 
looks good to me, I'll update the PR and also squash the commits. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31307) RocksDB:java.lang.UnsatisfiedLinkError

2023-03-07 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697765#comment-17697765
 ] 

Yun Tang commented on FLINK-31307:
--

What's the meaning of {{2.2.1}} after {{frocksdbjni-6.20.3-ververica-1.0.jar}}? 
One possible problem is that your application jar contains an open-source 
RocksDB which does not have such a method only in Flink version.

> RocksDB:java.lang.UnsatisfiedLinkError
> --
>
> Key: FLINK-31307
> URL: https://issues.apache.org/jira/browse/FLINK-31307
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.14.5
>Reporter: Wujunzhe
>Priority: Major
> Attachments: image-2023-03-03-10-27-04-810.png, 
> image-2023-03-03-10-29-27-477.png, image-2023-03-03-11-28-54-674.png
>
>
> when i use rocksdb like 
> !image-2023-03-03-10-27-04-810.png!
>  
>  I got an unsolvable exception. 
> !image-2023-03-03-10-29-27-477.png!
>  What can I do to troubleshoot or solve this problem? 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] WencongLiu commented on pull request #22128: [FLINK-31298] backport to branch release-1.16

2023-03-07 Thread via GitHub


WencongLiu commented on PR #22128:
URL: https://github.com/apache/flink/pull/22128#issuecomment-1459661390

   cc @reswqa 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on pull request #22129: [FLINK-31298] backport to branch release-1.17

2023-03-07 Thread via GitHub


WencongLiu commented on PR #22129:
URL: https://github.com/apache/flink/pull/22129#issuecomment-1459661015

   cc @reswqa 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on pull request #22129: [FLINK-31298] backport to branch release-1.17

2023-03-07 Thread via GitHub


WencongLiu commented on PR #22129:
URL: https://github.com/apache/flink/pull/22129#issuecomment-1459660239

   cc @reswqa 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22129: [FLINK-31298] backport to branch release-1.17

2023-03-07 Thread via GitHub


flinkbot commented on PR #22129:
URL: https://github.com/apache/flink/pull/22129#issuecomment-1459658319

   
   ## CI report:
   
   * 9cb47c24198a1806cffbfd41515500284f92d486 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22128: [FLINK-31298] backport to branch release-1.16

2023-03-07 Thread via GitHub


flinkbot commented on PR #22128:
URL: https://github.com/apache/flink/pull/22128#issuecomment-1459658196

   
   ## CI report:
   
   * 6e3d0b7ae2fc6cea17d3822b4fcf6844c927171e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu opened a new pull request, #22129: [FLINK-31298] backport to branch release-1.17

2023-03-07 Thread via GitHub


WencongLiu opened a new pull request, #22129:
URL: https://github.com/apache/flink/pull/22129

   fix ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows 
swallows IllegalArgumentException


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu opened a new pull request, #22128: [FLINK-31298] backport to branch 1.16

2023-03-07 Thread via GitHub


WencongLiu opened a new pull request, #22128:
URL: https://github.com/apache/flink/pull/22128

   fix ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows 
swallows IllegalArgumentException


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lincoln-lil commented on a diff in pull request #22127: [FLINK-31351][sql-gateway] Don't stop the stuck thread by force

2023-03-07 Thread via GitHub


lincoln-lil commented on code in PR #22127:
URL: https://github.com/apache/flink/pull/22127#discussion_r1129043195


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##
@@ -183,12 +190,15 @@ public ResultSet fetchResults(
 /** Closes the {@link OperationManager} and all operations. */
 public void close() {
 stateLock.writeLock().lock();
+Exception closeException = null;
 try {
 isRunning = false;
-for (Operation operation : submittedOperations.values()) {
-operation.close();
-}
+List copiedOperations = new 
ArrayList<>(submittedOperations.values());

Review Comment:
   we can remove this copy and move the 'clear()' to finally clause



##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##
@@ -30,26 +30,33 @@
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.service.result.NotReadyResult;
 import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlCancelException;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.IOUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Field;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;

Review Comment:
   nit: just use normal import?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30829) Make the backpressure tab could be sort by the backpressure level

2023-03-07 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-30829.
--
Fix Version/s: 1.18.0
   Resolution: Fixed

master(1.18) via ed30a4f906f35654745ca2f694b540ed50e0e14e.

> Make the backpressure tab could be sort by the backpressure level
> -
>
> Key: FLINK-30829
> URL: https://issues.apache.org/jira/browse/FLINK-30829
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.17.0
>Reporter: Zhanghao Chen
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> [FLINK-29998|https://issues.apache.org/jira/browse/FLINK-29998] enables user 
> to sort the backpressure tab to see which task is busiest. Another common 
> scenario for backpressure analysis is to find which tasks are backpressured. 
> We should add support to sort the backpressure tab by backpressure level as 
> well.
>  
> h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30829) Make the backpressure tab could be sort by the backpressure level

2023-03-07 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-30829:
---
Affects Version/s: 1.18.0

> Make the backpressure tab could be sort by the backpressure level
> -
>
> Key: FLINK-30829
> URL: https://issues.apache.org/jira/browse/FLINK-30829
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Zhanghao Chen
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> [FLINK-29998|https://issues.apache.org/jira/browse/FLINK-29998] enables user 
> to sort the backpressure tab to see which task is busiest. Another common 
> scenario for backpressure analysis is to find which tasks are backpressured. 
> We should add support to sort the backpressure tab by backpressure level as 
> well.
>  
> h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on pull request #22108: [FLINK-30829] Make the backpressure tab could be sort by busy/backpressure/idle seperately

2023-03-07 Thread via GitHub


reswqa commented on PR #22108:
URL: https://github.com/apache/flink/pull/22108#issuecomment-1459628828

   Thanks @yangjunhan, meged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa merged pull request #22108: [FLINK-30829] Make the backpressure tab could be sort by busy/backpressure/idle seperately

2023-03-07 Thread via GitHub


reswqa merged PR #22108:
URL: https://github.com/apache/flink/pull/22108


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31356) Serialize garbled characters at checkpoint

2023-03-07 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697746#comment-17697746
 ] 

Yanfei Lei commented on FLINK-31356:


Did any POJO fields update when restoring? Is it related to 
https://issues.apache.org/jira/browse/FLINK-21752?

A common cause of "java.io.UTFDataFormatException: malformed input around byte" 
 is the read and write in serializer are not symmetrical.

> Serialize garbled characters at checkpoint
> --
>
> Key: FLINK-31356
> URL: https://issues.apache.org/jira/browse/FLINK-31356
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.13.6
>Reporter: John
>Priority: Major
>
>  
> {panel:title=The last checkpoint of the program was successful}
> 2023-03-07 08:33:16,085 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
> checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job 
> 8b5720a4a40f50b995c97c6fe5b93079.
> 2023-03-07 08:33:16,918 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
> checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in 
> 849 ms).
> 2023-03-07 08:33:16,918 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 39126 as completed for source Source: kafkaDataStream.
> 2023-03-07 08:36:10,444 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to 
> FAILED on container_e38_1676011848026_0012_01_02 @  (dataPort=44633).
> java.lang.RuntimeException: Writing records to JDBC failed.
>     at 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153)
>  ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6]
> {panel}
> {panel:title=But from this checkpoint restore, it can't be decoded}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from 
> any of the 1 provided restore options.
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     ... 10 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore heap backend
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     ... 10 more
> Caused by: java.io.UTFDataFormatException: malformed input around byte 32
>     at java.io.DataInputStream.readUTF(DataInputStream.java:656) 
> ~[?:1.8.0_201]
>     at java.io.DataInputStream.readUTF(DataInputStream.java:564) 
> 

[GitHub] [flink] reswqa commented on pull request #22108: [FLINK-30829] Make the backpressure tab could be sort by busy/backpressure/idle seperately

2023-03-07 Thread via GitHub


reswqa commented on PR #22108:
URL: https://github.com/apache/flink/pull/22108#issuecomment-1459620850

   @yangjunhan, Would you mind taking a look at this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31298) ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException

2023-03-07 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697743#comment-17697743
 ] 

Weijie Guo commented on FLINK-31298:


master(1.18) via 7c5b7be5bc165a9799f10b5761a6ff15edee43b6.
release-1.15 waiting for CI.
release-1.16 waiting for CI.

> ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows 
> IllegalArgumentException
> -
>
> Key: FLINK-31298
> URL: https://issues.apache.org/jira/browse/FLINK-31298
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Matthias Pohl
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available, starter, test-stability
>
> FLINK-24156 introduced {{NetUtils.acceptWithoutTimeout}} which caused the 
> test to print a the stacktrace of an {{IllegalArgumentException}}:
> {code}
> Exception in thread "Thread-0" java.lang.IllegalArgumentException: 
> serverSocket SO_TIMEOUT option must be 0
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>   at 
> org.apache.flink.util.NetUtils.acceptWithoutTimeout(NetUtils.java:139)
>   at 
> org.apache.flink.runtime.net.ConnectionUtilsTest$1.run(ConnectionUtilsTest.java:83)
>   at java.lang.Thread.run(Thread.java:750)
> {code}
> This is also shown in the Maven output of CI runs and might cause confusion. 
> The test should be fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on pull request #22110: [FLINK-31298] fix ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException

2023-03-07 Thread via GitHub


reswqa commented on PR #22110:
URL: https://github.com/apache/flink/pull/22110#issuecomment-1459617373

   @WencongLiu We should backport this to relase-1.16 and release-1.17 also.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-27258) Deployment Chinese document malformed text

2023-03-07 Thread Jia Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jia Fan closed FLINK-27258.
---
Resolution: Invalid

> Deployment Chinese document malformed text
> --
>
> Key: FLINK-27258
> URL: https://issues.apache.org/jira/browse/FLINK-27258
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.12.7, 1.13.6, 1.14.4
>Reporter: Jia Fan
>Assignee: Jia Fan
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2022-04-15-11-35-19-712.png
>
>
> !image-2022-04-15-11-35-19-712.png!
> malformed text need to be fix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa merged pull request #22110: [FLINK-31298] fix ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException

2023-03-07 Thread via GitHub


reswqa merged PR #22110:
URL: https://github.com/apache/flink/pull/22110


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] yuzelin commented on pull request #584: [FLINK-31338] support infer parallelism for flink table store

2023-03-07 Thread via GitHub


yuzelin commented on PR #584:
URL: 
https://github.com/apache/flink-table-store/pull/584#issuecomment-1459614486

   The same problem I have met before. I've pushed some commits to try to solve 
the problem. You can rebase master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1129022590


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##
@@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() {
 localBufferPool.lazyDestroy();
 
 // All buffers have been requested, but can not be returned yet.
-assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+
assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
 // Recycle should return buffers to memory segment pool
 for (Buffer buffer : requests) {
 buffer.recycleBuffer();
 }
 }
 
+@Test
+void testDecreasePoolSize() throws Exception {
+final int maxMemorySegments = 10;
+final int requiredMemorySegments = 4;
+final int maxOverdraftBuffers = 2;
+final int largePoolSize = 5;
+final int smallPoolSize = 4;
+LocalBufferPool bufferPool =
+new LocalBufferPool(
+networkBufferPool,
+requiredMemorySegments,
+maxMemorySegments,
+0,
+Integer.MAX_VALUE,
+maxOverdraftBuffers);
+Queue buffers = new LinkedList<>();
+
+// set a larger pool size.
+bufferPool.setNumBuffers(largePoolSize);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
+
+// request all buffer.
+for (int i = 0; i < largePoolSize; i++) {
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+}
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// request 1 overdraft buffers.
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// set a small pool size.
+bufferPool.setNumBuffers(smallPoolSize);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// return all overdraft buffers.
+bufferPool.recycle(buffers.poll());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+bufferPool.recycle(buffers.poll());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// return the excess buffer.
+bufferPool.recycle(buffers.poll());
+assertThat(bufferPool.isAvailable()).isFalse();
+// return non-excess buffers.
+bufferPool.recycle(buffers.poll());
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isTrue();
+
+while (!buffers.isEmpty()) {
+bufferPool.recycle(buffers.poll());
+}
+bufferPool.lazyDestroy();
+}
+
+@Test
+void testIncreasePoolSize() throws Exception {

Review Comment:
   Sure, I added a test for `largePoolSize = 7`.
   As for `largePoolSize=8`, I think it is no different from the current 
situation of `largePoolSize = 10`. It also exceeds the total number of buffers 
but does not reach `maxMemorySegments`. But for clarity, I changed the previous 
test to `largePoolSize = 8`, and added test for the case of total number of 
buffers reached `maxMemorySegments`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1129022590


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##
@@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() {
 localBufferPool.lazyDestroy();
 
 // All buffers have been requested, but can not be returned yet.
-assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+
assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
 // Recycle should return buffers to memory segment pool
 for (Buffer buffer : requests) {
 buffer.recycleBuffer();
 }
 }
 
+@Test
+void testDecreasePoolSize() throws Exception {
+final int maxMemorySegments = 10;
+final int requiredMemorySegments = 4;
+final int maxOverdraftBuffers = 2;
+final int largePoolSize = 5;
+final int smallPoolSize = 4;
+LocalBufferPool bufferPool =
+new LocalBufferPool(
+networkBufferPool,
+requiredMemorySegments,
+maxMemorySegments,
+0,
+Integer.MAX_VALUE,
+maxOverdraftBuffers);
+Queue buffers = new LinkedList<>();
+
+// set a larger pool size.
+bufferPool.setNumBuffers(largePoolSize);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
+
+// request all buffer.
+for (int i = 0; i < largePoolSize; i++) {
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+}
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// request 1 overdraft buffers.
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// set a small pool size.
+bufferPool.setNumBuffers(smallPoolSize);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// return all overdraft buffers.
+bufferPool.recycle(buffers.poll());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+bufferPool.recycle(buffers.poll());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// return the excess buffer.
+bufferPool.recycle(buffers.poll());
+assertThat(bufferPool.isAvailable()).isFalse();
+// return non-excess buffers.
+bufferPool.recycle(buffers.poll());
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isTrue();
+
+while (!buffers.isEmpty()) {
+bufferPool.recycle(buffers.poll());
+}
+bufferPool.lazyDestroy();
+}
+
+@Test
+void testIncreasePoolSize() throws Exception {

Review Comment:
   Sure, I added a test for `largePoolSize = 7`.
   As for `largePoolSize=8`, I think it is no different from the current 
situation of `largePoolSize = 10`. It also exceeds the total number of buffers 
but does not reach `maxMemorySegments`. But I changed the previous test to 
`largePoolSize = 8`, and added test for the case of total number of buffers 
reached `maxMemorySegments`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1129012456


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
 mayNotifyAvailable(toNotify);
 }
 
+@GuardedBy("availableMemorySegments")
 private boolean shouldBeAvailable() {
 assert Thread.holdsLock(availableMemorySegments);
 
-return !availableMemorySegments.isEmpty()
-&& unavailableSubpartitionsCount == 0
-&& numberOfRequestedOverdraftMemorySegments == 0;
+return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   > Before we allows convert numberOfRequestedOverdraftMemorySegments to 
numberOfRequestedMemorySegments , there still be a situation where both 
availableMemorySegment and overdraft buffer are all not zero.
   
   Just as what I said, only after we supports convert 
`numberOfRequestedOverdraftMemorySegments` to 
`numberOfRequestedMemorySegments`, then can we avoid having both `available 
buffers` and `overdraft buffers`. 
   
   It is not enough to just modify the following code as there are still 
concurrency problems.
   ```
   if (!availableMemorySegments.isEmpty()) {
   segment = availableMemorySegments.poll();
   } else if (isRequestedSizeReached()) {
   // Only when the buffer request reaches the upper limit(i.e. 
current pool size),
   // requests an overdraft buffer.
   segment = requestOverdraftMemorySegmentFromGlobal();
   }
   ```
   
   Of course, we have allowed this conversion now, but I'm not very sure 
whether there are other concurrency problems even though after this fix.
   
   I admit that we should not modify the current design for possible future 
bugs. But there are some subtleties here : I somehow think that this is a 
redundant judgment condition, and removing it can avoid bugs at the same time.
   
   In any case, your reason is also make sense and acceptable to me. Therefore, 
I will not strongly adhere to my opinion. But for the sake of safety, perhaps 
we should listen to the opinions of @wsry who knows more about the 
implementation of `LocalBufferPool` than me.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1129012456


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
 mayNotifyAvailable(toNotify);
 }
 
+@GuardedBy("availableMemorySegments")
 private boolean shouldBeAvailable() {
 assert Thread.holdsLock(availableMemorySegments);
 
-return !availableMemorySegments.isEmpty()
-&& unavailableSubpartitionsCount == 0
-&& numberOfRequestedOverdraftMemorySegments == 0;
+return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   > Before we allows convert numberOfRequestedOverdraftMemorySegments to 
numberOfRequestedMemorySegments , there still be a situation where both 
availableMemorySegment and overdraft buffer are all not zero.
   
   Just as what I said, only after we supports convert 
`numberOfRequestedOverdraftMemorySegments` to 
`numberOfRequestedMemorySegments`, then can we avoid having both `available 
buffers` and `overdraft buffers`. 
   
   It is not enough to just modify the following code as there are still 
concurrency problems.
   ```
   if (!availableMemorySegments.isEmpty()) {
   segment = availableMemorySegments.poll();
   } else if (isRequestedSizeReached()) {
   // Only when the buffer request reaches the upper limit(i.e. 
current pool size),
   // requests an overdraft buffer.
   segment = requestOverdraftMemorySegmentFromGlobal();
   }
   ```
   
   Of course, we have allowed this conversion now, but I'm not very sure 
whether there are other concurrency problems even though after this fix.
   
   I admit that we should not modify the current design for possible future 
bugs. But there are some subtleties here : I somehow think that this is a 
redundant judgment condition, and removing it can avoid bugs at the same time.
   
   In any case, your reason is also make sense and acceptable to me. But for 
the sake of safety, perhaps we should listen to the opinions of @wsry who knows 
more about the implementation of `LocalBufferPool` than me.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1129012456


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
 mayNotifyAvailable(toNotify);
 }
 
+@GuardedBy("availableMemorySegments")
 private boolean shouldBeAvailable() {
 assert Thread.holdsLock(availableMemorySegments);
 
-return !availableMemorySegments.isEmpty()
-&& unavailableSubpartitionsCount == 0
-&& numberOfRequestedOverdraftMemorySegments == 0;
+return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   > Before we allows convert numberOfRequestedOverdraftMemorySegments to 
numberOfRequestedMemorySegments , there still be a situation where both 
availableMemorySegment and overdraft buffer are all not zero.
   
   Just as what I said, only after we supports convert 
`numberOfRequestedOverdraftMemorySegments` to 
`numberOfRequestedMemorySegments`, then can we avoid having both `available 
buffers` and `overdraft buffers`. 
   It is not enough to just modify the following code as there are still 
concurrency problems.
   ```
   if (!availableMemorySegments.isEmpty()) {
   segment = availableMemorySegments.poll();
   } else if (isRequestedSizeReached()) {
   // Only when the buffer request reaches the upper limit(i.e. 
current pool size),
   // requests an overdraft buffer.
   segment = requestOverdraftMemorySegmentFromGlobal();
   }
   ```
   Of course, we have allowed this conversion now, but I'm not very sure 
whether there are other concurrency problems even though after this fix.
   I admit that we should not modify the current design for possible future 
bugs. But there are some subtleties here : I somehow think that this is a 
redundant judgment condition, and removing it can avoid bugs at the same time.
   In any case, your reason is also make sense and acceptable to me. But for 
the sake of safety, perhaps we should listen to the opinions of @wsry who knows 
more about the implementation of `LocalBufferPool` than me.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1129012456


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
 mayNotifyAvailable(toNotify);
 }
 
+@GuardedBy("availableMemorySegments")
 private boolean shouldBeAvailable() {
 assert Thread.holdsLock(availableMemorySegments);
 
-return !availableMemorySegments.isEmpty()
-&& unavailableSubpartitionsCount == 0
-&& numberOfRequestedOverdraftMemorySegments == 0;
+return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   > Before we allows convert numberOfRequestedOverdraftMemorySegments to 
numberOfRequestedMemorySegments , there still be a situation where both 
availableMemorySegment and overdraft buffer are all not zero.
   
   Just as what I said, only after we supports convert 
`numberOfRequestedOverdraftMemorySegments` to 
`numberOfRequestedMemorySegments`, then can we avoid having both `available 
buffers` and `overdraft buffers`. 
   It is not enough to just modify the following code as there are still 
concurrency problems.
   ```
   if (!availableMemorySegments.isEmpty()) {
   segment = availableMemorySegments.poll();
   } else if (isRequestedSizeReached()) {
   // Only when the buffer request reaches the upper limit(i.e. 
current pool size),
   // requests an overdraft buffer.
   segment = requestOverdraftMemorySegmentFromGlobal();
   }
   ```
   Of course, we have allowed this conversion now, but I'm not very sure 
whether there are other concurrency problems even though after this fix.
   In any case, your reason is also make sense and acceptable to me. But for 
the sake of safety, perhaps we should listen to the opinions of @wsry who knows 
more about the implementation of `LocalBufferPool` than me.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128996203


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
 mayNotifyAvailable(toNotify);
 }
 
+@GuardedBy("availableMemorySegments")
 private boolean shouldBeAvailable() {
 assert Thread.holdsLock(availableMemorySegments);
 
-return !availableMemorySegments.isEmpty()
-&& unavailableSubpartitionsCount == 0
-&& numberOfRequestedOverdraftMemorySegments == 0;
+return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   > Actually, before overdraft buffer was introduced, the definition of 
available was very clear:
   There is at least one availableMemorySegment and no subpartitions has 
reached maxBuffersPerChannel.
   IMO, Introducing the overdraft mechanism should not break this protocol, 
overdraft buffer should not affect the judgment of availability.
   
   Sorry, overdraft buffer should affect the judgment of availability.
   
   As we discussed before: overdraft buffer is just used when requested buffer 
reached the upper limit(pool size). In the other word: _**overdraft buffer just 
be used after LocalBufferPool is unavailable.**_ 
   
   And why the name is `overdraft`? It temporarily uses some extra buffers 
outside the LocalBufferPool during it's unavailable. From the semantics of 
overdraft, if `numberOfRequestedOverdraftMemorySegments > 0`, then 
LocalBufferPool must be unavailable. That's why I add it here.
   
   You can take a look some backgrounds of the FLIP-227[1], its motivation is: 
when a task needs multiple network buffers to process a single record and the 
LocalBufferPool has no buffer (unavailable), it allows the task to overdraw 
some buffers to prevent the task from getting stuck.
   
   
   Why you want to remove it? I guess it has bug before, that is, the overdraft 
buffer is used when the `requested buffer` does not reach the upper limit, and 
you have fixed it in this PR.
   
   ```
   if (!availableMemorySegments.isEmpty()) {
   segment = availableMemorySegments.poll();
   } else if (isRequestedSizeReached()) {
   // Only when the buffer request reaches the upper limit(i.e. 
current pool size),
   // requests an overdraft buffer.
   segment = requestOverdraftMemorySegmentFromGlobal();
   }
   ```
   
   > For me, the key point is that if we think "The availableMemorySegments is 
always empty when numberOfRequestedOverdraftMemorySegments != 0." is tenable.
   
   If it's tenable now and future, we can remove the ` && 
numberOfRequestedOverdraftMemorySegments == 0` here, if not, it cannot be 
removed.
   
   Based on your feedback, I prefer keep it, because the root cause is: 
overdraft is misused in some cases, and you have fixed it.
   
   If there is other bug that misuse overdraft buffer, we should fix the bug to 
ensure overdraft buffer is used correctly instead of mark LocalBufferPool is 
available. Marking the LocalBufferPool is available directly may cause other 
unexpected bugs.
   
   [1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31351) HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 times out on CI

2023-03-07 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697736#comment-17697736
 ] 

Shengkai Fang commented on FLINK-31351:
---

I think the main cause is that the Thread#stop is not safe. The problem here is 
the worker thread is stopped by the canceler thread when the worker thread is 
still loading classes. However, the class can only be loaded once per 
classloader. When failed to load the class, the next time Classloader#loadClass 
will throw exceptions outside. So here I think we just notify the users we can 
not interrupt thread in time rather than just stop the thread by force.

> HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 
> times out on CI
> -
>
> Key: FLINK-31351
> URL: https://issues.apache.org/jira/browse/FLINK-31351
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: David Morávek
>Assignee: Shengkai Fang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46872=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24908]
>  
> {code:java}
> Mar 06 18:28:56 "ForkJoinPool-1-worker-25" #27 daemon prio=5 os_prio=0 
> tid=0x7ff4b1832000 nid=0x21b2 waiting on condition [0x7ff3a8c3e000]
> Mar 06 18:28:56java.lang.Thread.State: TIMED_WAITING (sleeping)
> Mar 06 18:28:56   at java.lang.Thread.sleep(Native Method)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.waitUntilJobIsRunning(HiveServer2EndpointITCase.java:1004)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.lambda$testExecuteStatementInSyncModeWithRuntimeException2$37(HiveServer2EndpointITCase.java:711)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase$$Lambda$2018/2127600974.accept(Unknown
>  Source)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.runExecuteStatementInSyncModeWithRuntimeException(HiveServer2EndpointITCase.java:999)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2(HiveServer2EndpointITCase.java:709)
> Mar 06 18:28:56   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 06 18:28:56   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 06 18:28:56   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 06 18:28:56   at java.lang.reflect.Method.invoke(Method.java:498)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] FangYongs commented on pull request #426: [FLINK-30323] Support table statistics in table store

2023-03-07 Thread via GitHub


FangYongs commented on PR #426:
URL: 
https://github.com/apache/flink-table-store/pull/426#issuecomment-1459593126

   Hi @JingsongLi I have updated this PR, please help to review it again, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31365) Simplify FlinkActionsE2ETest#testMergeInto

2023-03-07 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-31365.

Fix Version/s: table-store-0.4.0
 Assignee: yuzelin
   Resolution: Fixed

master: 65335ee90aa0b78a0bd8cf18ae351c7696148e23

> Simplify FlinkActionsE2ETest#testMergeInto
> --
>
> Key: FLINK-31365
> URL: https://issues.apache.org/jira/browse/FLINK-31365
> Project: Flink
>  Issue Type: Sub-task
>Reporter: yuzelin
>Assignee: yuzelin
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> Complicated test may cause failure in docker environment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31365) Simplify FlinkActionsE2ETest#testMergeInto

2023-03-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31365:
---
Labels: pull-request-available  (was: )

> Simplify FlinkActionsE2ETest#testMergeInto
> --
>
> Key: FLINK-31365
> URL: https://issues.apache.org/jira/browse/FLINK-31365
> Project: Flink
>  Issue Type: Sub-task
>Reporter: yuzelin
>Priority: Major
>  Labels: pull-request-available
>
> Complicated test may cause failure in docker environment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #587: [FLINK-31365] Fix unstable FlinkActionsE2ETest#testMergeInto by simplifying it

2023-03-07 Thread via GitHub


JingsongLi merged PR #587:
URL: https://github.com/apache/flink-table-store/pull/587


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128996203


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
 mayNotifyAvailable(toNotify);
 }
 
+@GuardedBy("availableMemorySegments")
 private boolean shouldBeAvailable() {
 assert Thread.holdsLock(availableMemorySegments);
 
-return !availableMemorySegments.isEmpty()
-&& unavailableSubpartitionsCount == 0
-&& numberOfRequestedOverdraftMemorySegments == 0;
+return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   > Actually, before overdraft buffer was introduced, the definition of 
available was very clear:
   There is at least one availableMemorySegment and no subpartitions has 
reached maxBuffersPerChannel.
   IMO, Introducing the overdraft mechanism should not break this protocol, 
overdraft buffer should not affect the judgment of availability.
   
   Sorry, overdraft buffer should affect the judgment of availability.
   
   As we discussed before: overdraft buffer is just used when requested buffer 
reached the upper limit(pool size). In the other word: _**overdraft buffer just 
be used after LocalBufferPool is unavailable.**_ 
   
   And why the name is `overdraft`? It temporarily uses some extra buffers 
outside the LocalBufferPool. From the semantics of overdraft, if 
`numberOfRequestedOverdraftMemorySegments > 0`, then LocalBufferPool must be 
unavailable. That's why I add it here.
   
   Why you want to remove it? I guess it has bug before, that is, the overdraft 
buffer is used when the `requested buffer` does not reach the upper limit, and 
you have fixed it in this PR.
   
   ```
   if (!availableMemorySegments.isEmpty()) {
   segment = availableMemorySegments.poll();
   } else if (isRequestedSizeReached()) {
   // Only when the buffer request reaches the upper limit(i.e. 
current pool size),
   // requests an overdraft buffer.
   segment = requestOverdraftMemorySegmentFromGlobal();
   }
   ```
   
   > For me, the key point is that if we think "The availableMemorySegments is 
always empty when numberOfRequestedOverdraftMemorySegments != 0." is tenable.
   
   If it's tenable now and future, we can remove the ` && 
numberOfRequestedOverdraftMemorySegments == 0` here, if not, it cannot be 
removed.
   
   Based on your feedback, I prefer keep it, because the root cause is: 
overdraft is misused in some cases, and you have fixed it.
   
   If there is other bug that misuse overdraft buffer, we should fix the bug to 
ensure overdraft buffer is used correctly instead of mark LocalBufferPool is 
available. Marking the LocalBufferPool is available directly may cause other 
unexpected bugs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] FangYongs commented on pull request #569: [FLINK-31269] Split hive connector to each module of each version

2023-03-07 Thread via GitHub


FangYongs commented on PR #569:
URL: 
https://github.com/apache/flink-table-store/pull/569#issuecomment-1459580158

   > > 
   > 
   > Have you tried to setup metastore in hive-2.1-cdh-6.3, and use 
flink-table-store-hive-catalog.jar in Flink side?
   
   Hi @JingsongLi  I also test for hive-2.1.1, it works


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] FangYongs commented on pull request #569: [FLINK-31269] Split hive connector to each module of each version

2023-03-07 Thread via GitHub


FangYongs commented on PR #569:
URL: 
https://github.com/apache/flink-table-store/pull/569#issuecomment-1459579297

   Thanks @tsreaper  DONE


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128996203


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
 mayNotifyAvailable(toNotify);
 }
 
+@GuardedBy("availableMemorySegments")
 private boolean shouldBeAvailable() {
 assert Thread.holdsLock(availableMemorySegments);
 
-return !availableMemorySegments.isEmpty()
-&& unavailableSubpartitionsCount == 0
-&& numberOfRequestedOverdraftMemorySegments == 0;
+return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   > Actually, before overdraft buffer was introduced, the definition of 
available was very clear:
   There is at least one availableMemorySegment and no subpartitions has 
reached maxBuffersPerChannel.
   IMO, Introducing the overdraft mechanism should not break this protocol, 
overdraft buffer should not affect the judgment of availability.
   
   Sorry, overdraft buffer should affect the judgment of availability.
   
   As we discussed before: overdraft buffer is just used when requested buffer 
reached the upper limit(pool size). In the other word: overdraft buffer should 
be used after LocalBufferPool is unavailable. 
   
   And why the name is `overdraft`? It temporarily uses some extra buffers 
outside the LocalBufferPool. From the semantics of overdraft, if 
`numberOfRequestedOverdraftMemorySegments > 0`, then LocalBufferPool must be 
unavailable. That's why I add it here.
   
   Why you want to remove it? I guess it has bug before, that is, the overdraft 
buffer is used when the `requested buffer` does not reach the upper limit, and 
you have fixed it in this PR.
   
   ```
   if (!availableMemorySegments.isEmpty()) {
   segment = availableMemorySegments.poll();
   } else if (isRequestedSizeReached()) {
   // Only when the buffer request reaches the upper limit(i.e. 
current pool size),
   // requests an overdraft buffer.
   segment = requestOverdraftMemorySegmentFromGlobal();
   }
   ```
   
   > For me, the key point is that if we think "The availableMemorySegments is 
always empty when numberOfRequestedOverdraftMemorySegments != 0." is tenable.
   
   If it's tenable now and future, we can remove the ` && 
numberOfRequestedOverdraftMemorySegments == 0` here, if not, it cannot be 
removed.
   
   Based on your feedback, I prefer keep it, because the root cause is: 
overdraft is misused in some cases, and you have fixed it.
   
   If there is other bug that misuse overdraft buffer, we should fix the bug to 
ensure overdraft buffer is used correctly instead of mark LocalBufferPool is 
available. Marking the LocalBufferPool is available directly may cause other 
unexpected bugs.



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##
@@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() {
 localBufferPool.lazyDestroy();
 
 // All buffers have been requested, but can not be returned yet.
-assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+
assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
 // Recycle should return buffers to memory segment pool
 for (Buffer buffer : requests) {
 buffer.recycleBuffer();
 }
 }
 
+@Test
+void testDecreasePoolSize() throws Exception {
+final int maxMemorySegments = 10;
+final int requiredMemorySegments = 4;
+final int maxOverdraftBuffers = 2;
+final int largePoolSize = 5;
+final int smallPoolSize = 4;
+LocalBufferPool bufferPool =
+new LocalBufferPool(
+networkBufferPool,
+requiredMemorySegments,
+maxMemorySegments,
+0,
+Integer.MAX_VALUE,
+maxOverdraftBuffers);
+Queue buffers = new LinkedList<>();
+
+// set a larger pool size.
+bufferPool.setNumBuffers(largePoolSize);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
+
+// request all buffer.
+for (int i = 0; i < largePoolSize; i++) {
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+}
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// request 1 overdraft buffers.
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// set a small pool size.
+bufferPool.setNumBuffers(smallPoolSize);
+   

[GitHub] [flink] fsk119 commented on pull request #16532: [FLINK-13400]Remove Hive and Hadoop dependencies from SQL Client

2023-03-07 Thread via GitHub


fsk119 commented on PR #16532:
URL: https://github.com/apache/flink/pull/16532#issuecomment-1459564895

   @snuyanzin Sorry for the late response. I think SqlGatewayE2ECase is a good 
point to restart the work. We have introduced a hive docker in the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wuchong merged pull request #22104: [hotfix][docs] Fix the wrong configuration key of "table.exec.simplify-operator-name-enabled" in documentation

2023-03-07 Thread via GitHub


wuchong merged PR #22104:
URL: https://github.com/apache/flink/pull/22104


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31311) Supports Bounded Watermark streaming read

2023-03-07 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-31311.

Resolution: Fixed

master: e715ac81a906e8f1b285d3847cc75879bb54d062

> Supports Bounded Watermark streaming read
> -
>
> Key: FLINK-31311
> URL: https://issues.apache.org/jira/browse/FLINK-31311
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> There are some bound stream scenarios that require that stream reading can be 
> ended. Generally speaking, the end event time is the better.
> So in this ticket, supports writing the watermark to the snapshot and can 
> specify the ending watermark when reading the stream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #576: [FLINK-31311] Supports Bounded Watermark streaming read

2023-03-07 Thread via GitHub


JingsongLi merged PR #576:
URL: https://github.com/apache/flink-table-store/pull/576


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22127: [FLINK-31351][sql-gateway] Don't stop the stuck thread by force

2023-03-07 Thread via GitHub


flinkbot commented on PR #22127:
URL: https://github.com/apache/flink/pull/22127#issuecomment-1459422574

   
   ## CI report:
   
   * 2331a3f92229c2cd6dfd0f2dc418f77fbc83e4c3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31365) Simplify FlinkActionsE2ETest#testMergeInto

2023-03-07 Thread yuzelin (Jira)
yuzelin created FLINK-31365:
---

 Summary: Simplify FlinkActionsE2ETest#testMergeInto
 Key: FLINK-31365
 URL: https://issues.apache.org/jira/browse/FLINK-31365
 Project: Flink
  Issue Type: Sub-task
Reporter: yuzelin


Complicated test may cause failure in docker environment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31351) HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 times out on CI

2023-03-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31351:
---
Labels: pull-request-available test-stability  (was: test-stability)

> HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 
> times out on CI
> -
>
> Key: FLINK-31351
> URL: https://issues.apache.org/jira/browse/FLINK-31351
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: David Morávek
>Assignee: Shengkai Fang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46872=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24908]
>  
> {code:java}
> Mar 06 18:28:56 "ForkJoinPool-1-worker-25" #27 daemon prio=5 os_prio=0 
> tid=0x7ff4b1832000 nid=0x21b2 waiting on condition [0x7ff3a8c3e000]
> Mar 06 18:28:56java.lang.Thread.State: TIMED_WAITING (sleeping)
> Mar 06 18:28:56   at java.lang.Thread.sleep(Native Method)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.waitUntilJobIsRunning(HiveServer2EndpointITCase.java:1004)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.lambda$testExecuteStatementInSyncModeWithRuntimeException2$37(HiveServer2EndpointITCase.java:711)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase$$Lambda$2018/2127600974.accept(Unknown
>  Source)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.runExecuteStatementInSyncModeWithRuntimeException(HiveServer2EndpointITCase.java:999)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2(HiveServer2EndpointITCase.java:709)
> Mar 06 18:28:56   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 06 18:28:56   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 06 18:28:56   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 06 18:28:56   at java.lang.reflect.Method.invoke(Method.java:498)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fsk119 opened a new pull request, #22127: [FLINK-31351][sql-gateway] Don't stop the stuck thread by force

2023-03-07 Thread via GitHub


fsk119 opened a new pull request, #22127:
URL: https://github.com/apache/flink/pull/22127

   
   
   ## What is the purpose of the change
   
   *We should not stop the thread by force because it may cause an inconsistent 
state. So we just notify the users it has the possibility to cause resource 
leak. *
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-31351) HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 times out on CI

2023-03-07 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang reassigned FLINK-31351:
-

Assignee: Shengkai Fang

> HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 
> times out on CI
> -
>
> Key: FLINK-31351
> URL: https://issues.apache.org/jira/browse/FLINK-31351
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: David Morávek
>Assignee: Shengkai Fang
>Priority: Blocker
>  Labels: test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46872=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24908]
>  
> {code:java}
> Mar 06 18:28:56 "ForkJoinPool-1-worker-25" #27 daemon prio=5 os_prio=0 
> tid=0x7ff4b1832000 nid=0x21b2 waiting on condition [0x7ff3a8c3e000]
> Mar 06 18:28:56java.lang.Thread.State: TIMED_WAITING (sleeping)
> Mar 06 18:28:56   at java.lang.Thread.sleep(Native Method)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.waitUntilJobIsRunning(HiveServer2EndpointITCase.java:1004)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.lambda$testExecuteStatementInSyncModeWithRuntimeException2$37(HiveServer2EndpointITCase.java:711)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase$$Lambda$2018/2127600974.accept(Unknown
>  Source)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.runExecuteStatementInSyncModeWithRuntimeException(HiveServer2EndpointITCase.java:999)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2(HiveServer2EndpointITCase.java:709)
> Mar 06 18:28:56   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 06 18:28:56   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 06 18:28:56   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 06 18:28:56   at java.lang.reflect.Method.invoke(Method.java:498)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa closed pull request #22122: [BP-1.17][FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0.

2023-03-07 Thread via GitHub


reswqa closed pull request #22122: [BP-1.17][FLINK-31346][runtime] IO scheduler 
does not throw TimeoutException if numRequestedBuffers is greater than 0.
URL: https://github.com/apache/flink/pull/22122


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #22122: [BP-1.17][FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0.

2023-03-07 Thread via GitHub


reswqa commented on PR #22122:
URL: https://github.com/apache/flink/pull/22122#issuecomment-1459371675

   closed via 54c67e5e08c11ef9a538abbf14618f9e27be18f7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128947802


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##
@@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() {
 localBufferPool.lazyDestroy();
 
 // All buffers have been requested, but can not be returned yet.
-assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+
assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
 // Recycle should return buffers to memory segment pool
 for (Buffer buffer : requests) {
 buffer.recycleBuffer();
 }
 }
 
+@Test
+void testDecreasePoolSize() throws Exception {
+final int maxMemorySegments = 10;
+final int requiredMemorySegments = 4;
+final int maxOverdraftBuffers = 2;
+final int largePoolSize = 5;
+final int smallPoolSize = 4;
+LocalBufferPool bufferPool =
+new LocalBufferPool(
+networkBufferPool,
+requiredMemorySegments,
+maxMemorySegments,
+0,
+Integer.MAX_VALUE,
+maxOverdraftBuffers);
+Queue buffers = new LinkedList<>();
+
+// set a larger pool size.
+bufferPool.setNumBuffers(largePoolSize);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
+
+// request all buffer.
+for (int i = 0; i < largePoolSize; i++) {
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+}
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// request 1 overdraft buffers.
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// set a small pool size.
+bufferPool.setNumBuffers(smallPoolSize);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// return all overdraft buffers.
+bufferPool.recycle(buffers.poll());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+bufferPool.recycle(buffers.poll());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// return the excess buffer.
+bufferPool.recycle(buffers.poll());
+assertThat(bufferPool.isAvailable()).isFalse();
+// return non-excess buffers.
+bufferPool.recycle(buffers.poll());
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isTrue();
+
+while (!buffers.isEmpty()) {
+bufferPool.recycle(buffers.poll());
+}
+bufferPool.lazyDestroy();
+}
+
+@Test
+void testIncreasePoolSize() throws Exception {

Review Comment:
   Agree with you, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] zhangjun0x01 commented on pull request #584: [FLINK-31338] support infer parallelism for flink table store

2023-03-07 Thread via GitHub


zhangjun0x01 commented on PR #584:
URL: 
https://github.com/apache/flink-table-store/pull/584#issuecomment-1459354880

   > @zhangjun0x01 Can we consider to use bucket number as streaming default 
parallelism? And use parallelism inference only for batch source.
   
   ok


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] zhangjun0x01 commented on pull request #584: [FLINK-31338] support infer parallelism for flink table store

2023-03-07 Thread via GitHub


zhangjun0x01 commented on PR #584:
URL: 
https://github.com/apache/flink-table-store/pull/584#issuecomment-1459353387

   > I guess why there are test failures is because too much parallelism has 
been derived, resulting in scheduling failures.
   
   Should we disable parallelism inference in default?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] masteryhx commented on a diff in pull request #21835: [FLINK-30854][state] Support periodic compaction for RocksdbCompactFilterCleanupStrategy

2023-03-07 Thread via GitHub


masteryhx commented on code in PR #21835:
URL: https://github.com/apache/flink/pull/21835#discussion_r1128942248


##
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java:
##
@@ -437,18 +465,43 @@ public boolean runCleanupForEveryRecord() {
 DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY =
 new RocksdbCompactFilterCleanupStrategy(1000L);
 
+/**
+ * Default value lets RocksDB control this feature as needed. For now, 
RocksDB will change
+ * this value to 30 days (i.e 30 * 24 * 60 * 60) so that every file 
goes through the
+ * compaction process at least once every 30 days if not compacted 
sooner.
+ */
+static final long DEFAULT_PERIODIC_COMPACTION_SECONDS = 
0xfffeL;

Review Comment:
   Just as you commented in the frocksdb:
   `Default: 0xfffe (allow RocksDB to auto-tune) For now, RocksDB 
will change this value to 30 days
 (i.e 30 * 24 * 60 * 60) so that every file goes through the compaction 
process at least once every 30 days if not compacted sooner.`
IIUC, it's the default value of this feature in RocksDB ?
Or Do we need to introduce a specific default value for flink meaning not 
setting this option ? WDYT ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30983) the security.ssl.algorithms configuration does not take effect in rest ssl

2023-03-07 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-30983:
-
Fix Version/s: 1.17.0
   1.18.0

> the security.ssl.algorithms configuration does not take effect in rest ssl
> --
>
> Key: FLINK-30983
> URL: https://issues.apache.org/jira/browse/FLINK-30983
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: luyuan
>Assignee: Yuxin Tan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.18.0
>
> Attachments: image-2023-02-09-15-58-36-254.png, 
> image-2023-02-09-15-58-43-963.png
>
>
> The security.ssl.algorithms configuration does not take effect in rest ssl.
>  
> SSLUtils#createRestNettySSLContext does not call SslContextBuilder#ciphers as 
>  SSLUtils#createInternalNettySSLContext.
> !image-2023-02-09-15-58-36-254.png!
>  
> !image-2023-02-09-15-58-43-963.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] masteryhx commented on a diff in pull request #21835: [FLINK-30854][state] Support periodic compaction for RocksdbCompactFilterCleanupStrategy

2023-03-07 Thread via GitHub


masteryhx commented on code in PR #21835:
URL: https://github.com/apache/flink/pull/21835#discussion_r1128940896


##
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java:
##
@@ -298,12 +298,40 @@ public Builder cleanupInRocksdbCompactFilter(long 
queryTimeAfterNumEntries) {
 return this;
 }
 
+/**
+ * Cleanup expired state while Rocksdb compaction is running.
+ *
+ * RocksDB compaction filter will query current timestamp, used to 
check expiration, from
+ * Flink every time after processing {@code queryTimeAfterNumEntries} 
number of state
+ * entries. Updating the timestamp more often can improve cleanup 
speed but it decreases
+ * compaction performance because it uses JNI call from native code.
+ *
+ * Periodic compaction could speed up expired state entries 
cleanup, especially for state
+ * entries rarely accessed. Files older than this value will be picked 
up for compaction,
+ * and re-written to the same level as they were before. It makes sure 
a file goes through
+ * compaction filters periodically.
+ *
+ * @param queryTimeAfterNumEntries number of state entries to process 
by compaction filter
+ * before updating current timestamp
+ * @param periodicCompactionSeconds periodic compaction per seconds 
which could speed up
+ * expired state cleanup. 0 means turning off periodic compaction.
+ */
+@Nonnull
+public Builder cleanupInRocksdbCompactFilter(
+long queryTimeAfterNumEntries, long periodicCompactionSeconds) 
{

Review Comment:
   Sorry for the delayed update due to my personal business last month.
   Thanks for the suggestion. I replaced `long` with `Time` which is also the 
type of TTL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on pull request #584: [FLINK-31338] support infer parallelism for flink table store

2023-03-07 Thread via GitHub


JingsongLi commented on PR #584:
URL: 
https://github.com/apache/flink-table-store/pull/584#issuecomment-1459327789

   @zhangjun0x01 Can we consider to use bucket number as default parallelism?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JunRuiLee commented on a diff in pull request #22098: [FLINK-31144][coordination] Modify the judgment logic of whether to ignore the input locations of a ConsumePartitionGroup if the

2023-03-07 Thread via GitHub


JunRuiLee commented on code in PR #22098:
URL: https://github.com/apache/flink/pull/22098#discussion_r1128940538


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java:
##
@@ -95,8 +95,7 @@ private CompletableFuture> 
getPreferredLocations
 // consumers compared to the consumed partition group size. This 
is to avoid tasks
 // unevenly distributed on nodes when running batch jobs or 
running jobs in
 // session/standalone mode.
-if ((double) consumedPartitionGroup.getConsumerVertexGroup().size()
-/ consumedPartitionGroup.size()
+if (consumedPartitionGroup.getConsumerVertexGroup().size()

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] huwh commented on a diff in pull request #22098: [FLINK-31144][coordination] Modify the judgment logic of whether to ignore the input locations of a ConsumePartitionGroup if the corre

2023-03-07 Thread via GitHub


huwh commented on code in PR #22098:
URL: https://github.com/apache/flink/pull/22098#discussion_r1128938614


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetriever.java:
##
@@ -95,8 +95,7 @@ private CompletableFuture> 
getPreferredLocations
 // consumers compared to the consumed partition group size. This 
is to avoid tasks
 // unevenly distributed on nodes when running batch jobs or 
running jobs in
 // session/standalone mode.
-if ((double) consumedPartitionGroup.getConsumerVertexGroup().size()
-/ consumedPartitionGroup.size()
+if (consumedPartitionGroup.getConsumerVertexGroup().size()

Review Comment:
   typo: i -> in



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128937275


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
 mayNotifyAvailable(toNotify);
 }
 
+@GuardedBy("availableMemorySegments")
 private boolean shouldBeAvailable() {
 assert Thread.holdsLock(availableMemorySegments);
 
-return !availableMemorySegments.isEmpty()
-&& unavailableSubpartitionsCount == 0
-&& numberOfRequestedOverdraftMemorySegments == 0;
+return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   Before we allows convert  `numberOfRequestedOverdraftMemorySegments` to 
`numberOfRequestedMemorySegments `, there still be a situation where both 
`availableMemorySegment` and `overdraft buffer` are all not zero, and this 
state should obviously be defined as available. But after that, I'm not sure 
whether this situation still exists. I'm a little worried about the potential 
bug of multithreading if we 
add `numberOfRequestedOverdraftMemorySegments  == 0` back to 
`shouldBeAvailable`. 
   
   For me, the key point is that if we think "`The availableMemorySegments` is 
always empty when `numberOfRequestedOverdraftMemorySegments != 0`." is tenable, 
does `!availableMemorySegments.isEmpty()` already include 
`numberOfRequestedOverdraftMemorySegments == 0`? Even if this will not 
introduce bug, why should we impose useless constraints? 
   
   Actually, before overdraft buffer was introduced, the definition of 
`available` was very clear:
   There is at least one `availableMemorySegment` and no subpartitions has 
reached `maxBuffersPerChannel`.
   IMO, Introducing the overdraft mechanism should not break this protocol, 
overdraft buffer should not affect the judgment of availability.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
 mayNotifyAvailable(toNotify);
 }
 
+@GuardedBy("availableMemorySegments")
 private boolean shouldBeAvailable() {
 assert Thread.holdsLock(availableMemorySegments);
 
-return !availableMemorySegments.isEmpty()
-&& unavailableSubpartitionsCount == 0
-&& numberOfRequestedOverdraftMemorySegments == 0;
+return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   Before we allows convert  `numberOfRequestedOverdraftMemorySegments` to 
`numberOfRequestedMemorySegments `, there still be a situation where both 
`availableMemorySegment` and `overdraft buffer` are all not zero, and this 
state should obviously be defined as available. But after that, I'm not sure 
whether this situation still exists. I'm a little worried about the potential 
bug of multithreading if we 
add `numberOfRequestedOverdraftMemorySegments  == 0` back to 
`shouldBeAvailable`. 
   
   For me, the key point is that if we think "The `availableMemorySegments` is 
always empty when `numberOfRequestedOverdraftMemorySegments != 0`." is tenable, 
does `!availableMemorySegments.isEmpty()` already include 
`numberOfRequestedOverdraftMemorySegments == 0`? Even if this will not 
introduce bug, why should we impose useless constraints? 
   
   Actually, before overdraft buffer was introduced, the definition of 
`available` was very clear:
   There is at least one `availableMemorySegment` and no subpartitions has 
reached `maxBuffersPerChannel`.
   IMO, Introducing the overdraft mechanism should not break this protocol, 
overdraft buffer should not affect the judgment of availability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128937275


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
 mayNotifyAvailable(toNotify);
 }
 
+@GuardedBy("availableMemorySegments")
 private boolean shouldBeAvailable() {
 assert Thread.holdsLock(availableMemorySegments);
 
-return !availableMemorySegments.isEmpty()
-&& unavailableSubpartitionsCount == 0
-&& numberOfRequestedOverdraftMemorySegments == 0;
+return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   Before we allows convert  `numberOfRequestedOverdraftMemorySegments` to 
`numberOfRequestedMemorySegments `, there still be a situation where both 
`availableMemorySegment` and `overdraft buffer` are all not zero, and this 
state should obviously be defined as available. But after that, I'm not sure 
whether this situation still exists. I'm a little worried about the potential 
bug of multithreading if we 
add `numberOfRequestedOverdraftMemorySegments  == 0` back to 
`shouldBeAvailable`. 
   
   For me, the key point is that if we think "`The availableMemorySegments` is 
always empty when `numberOfRequestedOverdraftMemorySegments` != 0." is tenable, 
does `!availableMemorySegments.isEmpty()` already include 
`numberOfRequestedOverdraftMemorySegments == 0`? Even if this will not 
introduce bug, why should we impose useless constraints? 
   
   Actually, before overdraft buffer was introduced, the definition of 
`available` was very clear:
   There is at least one `availableMemorySegment` and no subpartitions has 
reached `maxBuffersPerChannel`.
   IMO, Introducing the overdraft mechanism should not break this protocol, 
overdraft buffer should not affect the judgment of availability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128937275


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
 mayNotifyAvailable(toNotify);
 }
 
+@GuardedBy("availableMemorySegments")
 private boolean shouldBeAvailable() {
 assert Thread.holdsLock(availableMemorySegments);
 
-return !availableMemorySegments.isEmpty()
-&& unavailableSubpartitionsCount == 0
-&& numberOfRequestedOverdraftMemorySegments == 0;
+return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   Before we allows convert  `numberOfRequestedOverdraftMemorySegments` to 
`numberOfRequestedMemorySegments `, there still be a situation where both 
`availableMemorySegment` and `overdraft buffer` are all not zero, and this 
state should obviously be defined as available. But after that, I'm not sure 
whether this situation still exists. I'm a little worried about the potential 
bug of multithreading if we 
add `numberOfRequestedOverdraftMemorySegments  == 0` back to 
`shouldBeAvailable`. 
   For me, the key point is that if we think "`The availableMemorySegments` is 
always empty when `numberOfRequestedOverdraftMemorySegments` != 0." is tenable, 
does `!availableMemorySegments.isEmpty()` already include 
`numberOfRequestedOverdraftMemorySegments == 0`? Even if this will not 
introduce bug, why should we impose useless constraints? 
   Actually, before overdraft buffer was introduced, the definition of 
`available` was very clear:
   There is at least one `availableMemorySegment` and no subpartitions has 
reached `maxBuffersPerChannel`.
   IMO, Introducing the overdraft mechanism should not break this protocol, 
overdraft buffer should not affect the judgment of availability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22126: [Bug][FLINK-31363]KafkaSink failed to commit transactions under EXACTLY_ONCE semantics.

2023-03-07 Thread via GitHub


flinkbot commented on PR #22126:
URL: https://github.com/apache/flink/pull/22126#issuecomment-1459297877

   
   ## CI report:
   
   * 79c14fdf6cfa1d56740e66d78e7b737b803e7591 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31363) KafkaSink failed to commit transactions under EXACTLY_ONCE semantics

2023-03-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31363:
---
Labels: pull-request-available  (was: )

> KafkaSink failed to commit transactions under EXACTLY_ONCE semantics
> 
>
> Key: FLINK-31363
> URL: https://issues.apache.org/jira/browse/FLINK-31363
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: lightzhao
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-03-08-10-54-51-410.png
>
>
> When KafkaSink starts Exactly once and no data is written to the topic during 
> a checkpoint, the transaction commit exception is triggered, with the 
> following exception.
> [Transiting to fatal error state due to 
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lightzhao opened a new pull request, #22126: [Bug][FLINK-31363]KafkaSink failed to commit transactions under EXACTLY_ONCE semantics.

2023-03-07 Thread via GitHub


lightzhao opened a new pull request, #22126:
URL: https://github.com/apache/flink/pull/22126

   
   
   
   
   ## What is the purpose of the change
   
   
[FLINK-31363](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-31363)
   When KafkaSink starts Exactly once and no data is written to the topic 
during a checkpoint, the transaction commit exception is triggered, with the 
following exception.
   https://user-images.githubusercontent.com/40714172/223610347-41dac9a6-ec87-40c8-9e77-d1eb93f39afd.png;>
   
   If data is written during the checkpoint, this exception will not occur.
   
   After the transaction attribute transactionStarted is changed to false, 
whether there is data writing or not will not affect the transaction commit.
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31364) [Flink] add metrics for TableStore

2023-03-07 Thread Ming Li (Jira)
Ming Li created FLINK-31364:
---

 Summary: [Flink] add metrics for TableStore
 Key: FLINK-31364
 URL: https://issues.apache.org/jira/browse/FLINK-31364
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: Ming Li


Currently, relevant metrics are missing in {{{}Table Store{}}}, such as split 
consumption speed, commit information statistics, etc. We can add metrics for 
real-time monitoring of the {{{}Table Store{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


reswqa commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128926904


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -669,6 +669,13 @@ public void setNumBuffers(int numBuffers) {
 
 currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
+// reset overdraft buffers
+while (numberOfRequestedOverdraftMemorySegments > 0
+&& numberOfRequestedMemorySegments < currentPoolSize) {

Review Comment:
   The reason why I didn't do this convert here is that we have an upper limit 
on the number of overdraft buffers. If this conversion is allowed, and the 
`pool size` changes very small, the `numberOfRequestedOverdraftMemorySegments ` 
will exceed its upper limit.
   Of course, we can convert it to the upper limit at most, but this will make 
the logic a little complicated. It seems that there is no benefit to introduce 
this mechanism. WDTY?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] masteryhx commented on pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one

2023-03-07 Thread via GitHub


masteryhx commented on PR #21635:
URL: https://github.com/apache/flink/pull/21635#issuecomment-1459246416

   > Hey @masteryhx, do you plan to work on that for 1.18?
   
   Sure, I'd like to. Thanks a lot for the reminder.
   Big sorry for the delayed update due to my personal business last month.
   I will rework on the pr this month.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31359) HsResultPartitionTest fails with fatal error

2023-03-07 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697703#comment-17697703
 ] 

Weijie Guo edited comment on FLINK-31359 at 3/8/23 3:00 AM:


master(1.18) via eff9b16799f162f31058be5acac567943a446dea.
This issue only affect master(1.18).


was (Author: weijie guo):
master(1.18) via eff9b16799f162f31058be5acac567943a446dea.

> HsResultPartitionTest fails with fatal error
> 
>
> Key: FLINK-31359
> URL: https://issues.apache.org/jira/browse/FLINK-31359
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Matthias Pohl
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46910=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8512
> {code}
> Mar 07 13:20:39 [ERROR] Process Exit Code: 239
> Mar 07 13:20:39 [ERROR] Crashed tests:
> Mar 07 13:20:39 [ERROR] 
> org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartitionTest
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31359) HsResultPartitionTest fails with fatal error

2023-03-07 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-31359.
--
Resolution: Fixed

master(1.18) via eff9b16799f162f31058be5acac567943a446dea.

> HsResultPartitionTest fails with fatal error
> 
>
> Key: FLINK-31359
> URL: https://issues.apache.org/jira/browse/FLINK-31359
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Matthias Pohl
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46910=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8512
> {code}
> Mar 07 13:20:39 [ERROR] Process Exit Code: 239
> Mar 07 13:20:39 [ERROR] Crashed tests:
> Mar 07 13:20:39 [ERROR] 
> org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartitionTest
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31359) HsResultPartitionTest fails with fatal error

2023-03-07 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-31359:
---
Fix Version/s: 1.18.0

> HsResultPartitionTest fails with fatal error
> 
>
> Key: FLINK-31359
> URL: https://issues.apache.org/jira/browse/FLINK-31359
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Matthias Pohl
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46910=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8512
> {code}
> Mar 07 13:20:39 [ERROR] Process Exit Code: 239
> Mar 07 13:20:39 [ERROR] Crashed tests:
> Mar 07 13:20:39 [ERROR] 
> org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartitionTest
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] JunRuiLee commented on pull request #22098: [FLINK-31144][coordination] Modify the judgment logic of whether to ignore the input locations of a ConsumePartitionGroup if the correspond

2023-03-07 Thread via GitHub


JunRuiLee commented on PR #22098:
URL: https://github.com/apache/flink/pull/22098#issuecomment-1459238292

   @huwh @zhuzhurk Thanks for review this pr and I've addressed comments, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa merged pull request #22125: [FLINK-31360][test] Using an executor service which ignore rejectException as shutdown for HsResultPartitionTest.

2023-03-07 Thread via GitHub


reswqa merged PR #22125:
URL: https://github.com/apache/flink/pull/22125


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31363) KafkaSink failed to commit transactions under EXACTLY_ONCE semantics

2023-03-07 Thread lightzhao (Jira)
lightzhao created FLINK-31363:
-

 Summary: KafkaSink failed to commit transactions under 
EXACTLY_ONCE semantics
 Key: FLINK-31363
 URL: https://issues.apache.org/jira/browse/FLINK-31363
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.16.1, 1.17.0, 1.18.0
Reporter: lightzhao
 Attachments: image-2023-03-08-10-54-51-410.png

When KafkaSink starts Exactly once and no data is written to the topic during a 
checkpoint, the transaction commit exception is triggered, with the following 
exception.

[Transiting to fatal error state due to 
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31298) ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException

2023-03-07 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697698#comment-17697698
 ] 

Wencong Liu commented on FLINK-31298:
-

cc [~mapohl] 

> ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows 
> IllegalArgumentException
> -
>
> Key: FLINK-31298
> URL: https://issues.apache.org/jira/browse/FLINK-31298
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Matthias Pohl
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available, starter, test-stability
>
> FLINK-24156 introduced {{NetUtils.acceptWithoutTimeout}} which caused the 
> test to print a the stacktrace of an {{IllegalArgumentException}}:
> {code}
> Exception in thread "Thread-0" java.lang.IllegalArgumentException: 
> serverSocket SO_TIMEOUT option must be 0
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>   at 
> org.apache.flink.util.NetUtils.acceptWithoutTimeout(NetUtils.java:139)
>   at 
> org.apache.flink.runtime.net.ConnectionUtilsTest$1.run(ConnectionUtilsTest.java:83)
>   at java.lang.Thread.run(Thread.java:750)
> {code}
> This is also shown in the Maven output of CI runs and might cause confusion. 
> The test should be fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] WencongLiu commented on pull request #22121: [FLINK-27051] fix CompletedCheckpoint.DiscardObject.discard is not idempotent

2023-03-07 Thread via GitHub


WencongLiu commented on PR #22121:
URL: https://github.com/apache/flink/pull/22121#issuecomment-1459222793

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22084: [FLINK-31293][runtime] LocalBufferPool request overdraft buffer only when no available buffer and pool size is reached

2023-03-07 Thread via GitHub


1996fanrui commented on code in PR #22084:
URL: https://github.com/apache/flink/pull/22084#discussion_r1128900085


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -503,12 +508,11 @@ private void onGlobalPoolAvailable() {
 mayNotifyAvailable(toNotify);
 }
 
+@GuardedBy("availableMemorySegments")
 private boolean shouldBeAvailable() {
 assert Thread.holdsLock(availableMemorySegments);
 
-return !availableMemorySegments.isEmpty()
-&& unavailableSubpartitionsCount == 0
-&& numberOfRequestedOverdraftMemorySegments == 0;
+return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;

Review Comment:
   Why the `numberOfRequestedOverdraftMemorySegments  == 0` can be removed? I 
think there is a constraint here: The `availableMemorySegments` is always empty 
when `numberOfRequestedOverdraftMemorySegments > 0`. 
   
   Sometimes this constraint does not hold before this PR, and we want to hold 
the constraint in the future, right?
   
   If yes, could you add some comments for 
`numberOfRequestedOverdraftMemorySegments`? It's helpful for other developers 
to understand the constraint.



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##
@@ -242,23 +243,206 @@ public void testRecycleAfterDestroy() {
 localBufferPool.lazyDestroy();
 
 // All buffers have been requested, but can not be returned yet.
-assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
+
assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers);
 
 // Recycle should return buffers to memory segment pool
 for (Buffer buffer : requests) {
 buffer.recycleBuffer();
 }
 }
 
+@Test
+void testDecreasePoolSize() throws Exception {
+final int maxMemorySegments = 10;
+final int requiredMemorySegments = 4;
+final int maxOverdraftBuffers = 2;
+final int largePoolSize = 5;
+final int smallPoolSize = 4;
+LocalBufferPool bufferPool =
+new LocalBufferPool(
+networkBufferPool,
+requiredMemorySegments,
+maxMemorySegments,
+0,
+Integer.MAX_VALUE,
+maxOverdraftBuffers);
+Queue buffers = new LinkedList<>();
+
+// set a larger pool size.
+bufferPool.setNumBuffers(largePoolSize);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
+
+// request all buffer.
+for (int i = 0; i < largePoolSize; i++) {
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+}
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// request 1 overdraft buffers.
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// set a small pool size.
+bufferPool.setNumBuffers(smallPoolSize);
+assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero();
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+buffers.add(bufferPool.requestMemorySegmentBlocking());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2);
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// return all overdraft buffers.
+bufferPool.recycle(buffers.poll());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isFalse();
+bufferPool.recycle(buffers.poll());
+
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero();
+assertThat(bufferPool.isAvailable()).isFalse();
+
+// return the excess buffer.
+bufferPool.recycle(buffers.poll());
+assertThat(bufferPool.isAvailable()).isFalse();
+// return non-excess buffers.
+bufferPool.recycle(buffers.poll());
+assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
+assertThat(bufferPool.isAvailable()).isTrue();
+
+while (!buffers.isEmpty()) {
+bufferPool.recycle(buffers.poll());
+}
+bufferPool.lazyDestroy();
+}
+
+@Test
+void testIncreasePoolSize() throws Exception {

Review Comment:
   I prefer change the `testIncreasePoolSize` to the 
`testIncreasePoolSizeExceedTotalBuffers`. And the 
`testIncreasePoolSizeExceedTotalBuffers` and 
`testIncreasePoolSizeNotExceedTotalBuffers` as the normal 

[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor

2023-03-07 Thread via GitHub


Fanoid commented on code in PR #210:
URL: https://github.com/apache/flink-ml/pull/210#discussion_r1128905027


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorage.java:
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.sharedstorage;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** A shared storage to support access through subtasks of different 
operators. */
+class SharedStorage {
+private static final Map, Object> m =

Review Comment:
   Sure! I'll rename variables in this class.
   
   For the latter issue, I prefer to consider `SharedStorage` as a container of 
**references** to data items, and provide sharing utility among subtasks of 
operators. It is still the owner of data items to worry memory issues. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor

2023-03-07 Thread via GitHub


Fanoid commented on code in PR #210:
URL: https://github.com/apache/flink-ml/pull/210#discussion_r1128889422


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorage.java:
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.sharedstorage;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** A shared storage to support access through subtasks of different 
operators. */
+class SharedStorage {
+private static final Map, Object> m =
+new ConcurrentHashMap<>();
+
+private static final Map, String> 
owners =
+new ConcurrentHashMap<>();
+
+/** Gets a {@link Reader} of shared item identified by (storageID, 
subtaskId, descriptor). */
+static  Reader getReader(
+StorageID storageID, int subtaskId, ItemDescriptor descriptor) {
+return new Reader<>(Tuple3.of(storageID, subtaskId, descriptor.key));
+}
+
+/** Gets a {@link Writer} of shared item identified by (storageID, 
subtaskId, key). */
+static  Writer getWriter(
+StorageID storageID,
+int subtaskId,
+ItemDescriptor descriptor,
+String ownerId,
+OperatorID operatorID,
+StreamTask containingTask,
+StreamingRuntimeContext runtimeContext,
+StateInitializationContext stateInitializationContext) {
+Tuple3 t = Tuple3.of(storageID, subtaskId, 
descriptor.key);
+String lastOwner = owners.putIfAbsent(t, ownerId);
+if (null != lastOwner) {
+throw new IllegalStateException(
+String.format(
+"The shared item (%s, %s, %s) already has a writer 
%s.",
+storageID, subtaskId, descriptor.key, ownerId));
+}
+Writer writer =
+new Writer<>(
+t,
+ownerId,
+descriptor.serializer,
+containingTask,
+runtimeContext,
+stateInitializationContext,
+operatorID);
+writer.set(descriptor.initVal);
+return writer;
+}
+
+static class Reader {
+protected final Tuple3 t;
+
+Reader(Tuple3 t) {
+this.t = t;
+}
+
+T get() {
+// It is possible that the `get` request of an item is triggered 
earlier than its
+// initialization. In this case, we wait for a while.
+long waitTime = 10;
+do {
+//noinspection unchecked
+T value = (T) m.get(t);
+if (null != value) {
+return value;
+}
+try {
+Thread.sleep(waitTime);
+} catch (InterruptedException e) {
+break;
+}
+waitTime *= 2;
+} while (waitTime < 10 * 1000);
+throw new IllegalStateException(
+String.format("Failed to get value of %s after waiting %d 
ms.", t, waitTime));
+}
+}
+
+static class Writer extends Reader {
+private final String ownerId;
+private final ListStateWithCache cache;
+private boolean isDirty;
+
+Writer(
+Tuple3 t,
+String ownerId,
+TypeSerializer serializer,
+StreamTask containingTask,
+StreamingRuntimeContext 

[GitHub] [flink] TanYuxin-tyx commented on pull request #22125: [FLINK-31360][test] Using an executor service which ignore rejectException as shutdown for HsResultPartitionTest.

2023-03-07 Thread via GitHub


TanYuxin-tyx commented on PR #22125:
URL: https://github.com/apache/flink/pull/22125#issuecomment-1459208813

   This change LGTM. 
   I found similar solutions to the issue, e.g., in SchedulerFactory, 
KubernetesLeaderElector, etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31346) Batch shuffle IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0

2023-03-07 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697402#comment-17697402
 ] 

Weijie Guo edited comment on FLINK-31346 at 3/8/23 2:34 AM:


master(1.18) via 5ad2ae2c24ade2655981f609298978d26329466f.
release-1.17 via 54c67e5e08c11ef9a538abbf14618f9e27be18f7.


was (Author: weijie guo):
master(1.18) via 5ad2ae2c24ade2655981f609298978d26329466f.

> Batch shuffle IO scheduler does not throw TimeoutException if 
> numRequestedBuffers is greater than 0
> ---
>
> Key: FLINK-31346
> URL: https://issues.apache.org/jira/browse/FLINK-31346
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.1
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> We currently rely on throw exception to trigger downstream task failover to 
> avoid read buffer request deadlock. But if {{numRequestedBuffers}} is greater 
> than 0, IO scheduler does not throw {{TimeoutException}}. This will cause a 
> deadlock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor

2023-03-07 Thread via GitHub


Fanoid commented on code in PR #210:
URL: https://github.com/apache/flink-ml/pull/210#discussion_r1128889422


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorage.java:
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.sharedstorage;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** A shared storage to support access through subtasks of different 
operators. */
+class SharedStorage {
+private static final Map, Object> m =
+new ConcurrentHashMap<>();
+
+private static final Map, String> 
owners =
+new ConcurrentHashMap<>();
+
+/** Gets a {@link Reader} of shared item identified by (storageID, 
subtaskId, descriptor). */
+static  Reader getReader(
+StorageID storageID, int subtaskId, ItemDescriptor descriptor) {
+return new Reader<>(Tuple3.of(storageID, subtaskId, descriptor.key));
+}
+
+/** Gets a {@link Writer} of shared item identified by (storageID, 
subtaskId, key). */
+static  Writer getWriter(
+StorageID storageID,
+int subtaskId,
+ItemDescriptor descriptor,
+String ownerId,
+OperatorID operatorID,
+StreamTask containingTask,
+StreamingRuntimeContext runtimeContext,
+StateInitializationContext stateInitializationContext) {
+Tuple3 t = Tuple3.of(storageID, subtaskId, 
descriptor.key);
+String lastOwner = owners.putIfAbsent(t, ownerId);
+if (null != lastOwner) {
+throw new IllegalStateException(
+String.format(
+"The shared item (%s, %s, %s) already has a writer 
%s.",
+storageID, subtaskId, descriptor.key, ownerId));
+}
+Writer writer =
+new Writer<>(
+t,
+ownerId,
+descriptor.serializer,
+containingTask,
+runtimeContext,
+stateInitializationContext,
+operatorID);
+writer.set(descriptor.initVal);
+return writer;
+}
+
+static class Reader {
+protected final Tuple3 t;
+
+Reader(Tuple3 t) {
+this.t = t;
+}
+
+T get() {
+// It is possible that the `get` request of an item is triggered 
earlier than its
+// initialization. In this case, we wait for a while.
+long waitTime = 10;
+do {
+//noinspection unchecked
+T value = (T) m.get(t);
+if (null != value) {
+return value;
+}
+try {
+Thread.sleep(waitTime);
+} catch (InterruptedException e) {
+break;
+}
+waitTime *= 2;
+} while (waitTime < 10 * 1000);
+throw new IllegalStateException(
+String.format("Failed to get value of %s after waiting %d 
ms.", t, waitTime));
+}
+}
+
+static class Writer extends Reader {
+private final String ownerId;
+private final ListStateWithCache cache;
+private boolean isDirty;
+
+Writer(
+Tuple3 t,
+String ownerId,
+TypeSerializer serializer,
+StreamTask containingTask,
+StreamingRuntimeContext 

[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor

2023-03-07 Thread via GitHub


Fanoid commented on code in PR #210:
URL: https://github.com/apache/flink-ml/pull/210#discussion_r1128873310


##
flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorageContextImpl.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.sharedstorage;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Default implementation of {@link SharedStorageContext} using {@link 
SharedStorage}. */
+@SuppressWarnings("rawtypes")
+class SharedStorageContextImpl implements SharedStorageContext, Serializable {
+private final StorageID storageID;
+private final Map writers = new 
HashMap<>();
+private final Map readers = new 
HashMap<>();
+private Map, String> ownerMap;
+
+public SharedStorageContextImpl() {
+this.storageID = new StorageID();
+}
+
+public void setOwnerMap(Map, String> ownerMap) {
+this.ownerMap = ownerMap;
+}
+
+@Override
+public void invoke(BiConsumerWithException func)
+throws Exception {
+func.accept(this::getSharedItem, this::setSharedItem);
+}
+
+private  T getSharedItem(ItemDescriptor key) {
+//noinspection unchecked
+SharedStorage.Reader reader = readers.get(key);
+Preconditions.checkState(
+null != reader,
+String.format(
+"The operator requested to read a shared item %s not 
owned by itself.",
+key));
+return reader.get();
+}
+
+private  void setSharedItem(ItemDescriptor key, T value) {
+//noinspection unchecked
+SharedStorage.Writer writer = writers.get(key);
+Preconditions.checkState(
+null != writer,
+String.format(
+"The operator requested to read a shared item %s not 
owned by itself.",
+key));
+writer.set(value);
+}
+
+@Override
+public  & SharedStorageStreamOperator> 
void initializeState(
+T operator,
+StreamingRuntimeContext runtimeContext,
+StateInitializationContext context) {
+String ownerId = operator.getSharedStorageAccessorID();
+int subtaskId = runtimeContext.getIndexOfThisSubtask();
+for (Map.Entry, String> entry : ownerMap.entrySet()) 
{
+ItemDescriptor descriptor = entry.getKey();
+if (ownerId.equals(entry.getValue())) {
+writers.put(

Review Comment:
   There is no instances to be overwritten here, because every subtask owns a 
distinct instance of `SharedStorageContextImpl`.
   
   The field of `context` in operators are serialized and then deserialized to 
different instances when operators being deployed. It just like other fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31343) Remove JMH dependency in flink-table-store-micro-benchmark

2023-03-07 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-31343.

Resolution: Fixed

master: d1c224890dad80c1d7664dc3011534b3ae1679f9

> Remove JMH dependency in flink-table-store-micro-benchmark
> --
>
> Key: FLINK-31343
> URL: https://issues.apache.org/jira/browse/FLINK-31343
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #585: [FLINK-31343] Remove JMH dependency in flink-table-store-micro-benchmark

2023-03-07 Thread via GitHub


JingsongLi merged PR #585:
URL: https://github.com/apache/flink-table-store/pull/585


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] FangYongs commented on pull request #585: [FLINK-31343] Remove JMH dependency in flink-table-store-micro-benchmark

2023-03-07 Thread via GitHub


FangYongs commented on PR #585:
URL: 
https://github.com/apache/flink-table-store/pull/585#issuecomment-1459150128

   Thanks @JingsongLi  LGTM +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on pull request #21452: [FLINK-30282] Fix Logical type ROW lost inner field's nullability after converting to RelDataType

2023-03-07 Thread via GitHub


LadyForest commented on PR #21452:
URL: https://github.com/apache/flink/pull/21452#issuecomment-1459148347

   > @LadyForest this change looks reasonable however now there are some 
conflicts... Do you mind if I ask you to rebase?
   
   Sure. I'll do the rebase. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor

2023-03-07 Thread via GitHub


Fanoid commented on code in PR #210:
URL: https://github.com/apache/flink-ml/pull/210#discussion_r1128853646


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/operators/CalcLocalSplitsOperator.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.gbt.operators;
+
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.iteration.IterationListener;
+import org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.common.gbt.defs.Histogram;
+import org.apache.flink.ml.common.gbt.defs.LearningNode;
+import org.apache.flink.ml.common.gbt.defs.Splits;
+import org.apache.flink.ml.common.sharedstorage.SharedStorageContext;
+import org.apache.flink.ml.common.sharedstorage.SharedStorageStreamOperator;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+/** Calculates local splits for assigned (nodeId, featureId) pairs. */
+public class CalcLocalSplitsOperator extends AbstractStreamOperator
+implements OneInputStreamOperator,
+IterationListener,
+SharedStorageStreamOperator {
+
+private static final String SPLIT_FINDER_STATE_NAME = "split_finder";
+private final String sharedStorageAccessorID;
+// States of local data.
+private transient ListStateWithCache splitFinderState;
+private transient SplitFinder splitFinder;
+private transient SharedStorageContext sharedStorageContext;
+
+public CalcLocalSplitsOperator() {
+sharedStorageAccessorID = getClass().getSimpleName() + "-" + 
UUID.randomUUID();

Review Comment:
   Actually, I've tried `StreamOperator#getOperatorID` before.  
   
   However, the Operator ID cannot be obtained before execution. Then, we are 
unable to specify the owner map of share data items when building graph. 
Without the owner map, it is difficult to control access in runtime.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on a diff in pull request #210: [FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor

2023-03-07 Thread via GitHub


Fanoid commented on code in PR #210:
URL: https://github.com/apache/flink-ml/pull/210#discussion_r1128853646


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/operators/CalcLocalSplitsOperator.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.gbt.operators;
+
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.iteration.IterationListener;
+import org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.common.gbt.defs.Histogram;
+import org.apache.flink.ml.common.gbt.defs.LearningNode;
+import org.apache.flink.ml.common.gbt.defs.Splits;
+import org.apache.flink.ml.common.sharedstorage.SharedStorageContext;
+import org.apache.flink.ml.common.sharedstorage.SharedStorageStreamOperator;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+/** Calculates local splits for assigned (nodeId, featureId) pairs. */
+public class CalcLocalSplitsOperator extends AbstractStreamOperator
+implements OneInputStreamOperator,
+IterationListener,
+SharedStorageStreamOperator {
+
+private static final String SPLIT_FINDER_STATE_NAME = "split_finder";
+private final String sharedStorageAccessorID;
+// States of local data.
+private transient ListStateWithCache splitFinderState;
+private transient SplitFinder splitFinder;
+private transient SharedStorageContext sharedStorageContext;
+
+public CalcLocalSplitsOperator() {
+sharedStorageAccessorID = getClass().getSimpleName() + "-" + 
UUID.randomUUID();

Review Comment:
   Actually, I've tried `StreamOperator#getOperatorID` before.  
   
   Since the Operator ID cannot be obtained before execution, we are unable to 
specify the owner map of share data items when building graph. Without the 
owner map, it is difficult to control access in runtime.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 closed pull request #21638: [FLINK-29948][rest] Add rootCause in the ErrorResponseBody

2023-03-07 Thread via GitHub


fsk119 closed pull request #21638: [FLINK-29948][rest] Add rootCause in the 
ErrorResponseBody
URL: https://github.com/apache/flink/pull/21638


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] weifonghsia commented on pull request #22085: PubSubSink should be using "withTopicName"

2023-03-07 Thread via GitHub


weifonghsia commented on PR #22085:
URL: https://github.com/apache/flink/pull/22085#issuecomment-1459075862

   @MartijnVisser - Would you be able to review this PR?
   Fairly small doc change 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31362) Upgrade to Calcite version to 1.33.0

2023-03-07 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31362:
---
Description: 
In Calcite 1.33.0, C-style escape strings have been supported. We could 
leverage it to enhance our string literals usage.

issue: https://issues.apache.org/jira/browse/CALCITE-5305


  was:
In Calcite 1.33.0, C-style escape strings have been supported. We should 
outline its usage in document after upgrading to Calcite 1.33.0



> Upgrade to Calcite version to 1.33.0
> 
>
> Key: FLINK-31362
> URL: https://issues.apache.org/jira/browse/FLINK-31362
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Aitozi
>Priority: Major
>
> In Calcite 1.33.0, C-style escape strings have been supported. We could 
> leverage it to enhance our string literals usage.
> issue: https://issues.apache.org/jira/browse/CALCITE-5305



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #546: [FLINK-31326] Consolidate source scaling logic

2023-03-07 Thread via GitHub


gyfora commented on code in PR #546:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/546#discussion_r1128753730


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java:
##
@@ -34,7 +33,11 @@ public enum MetricAggregator {
 this.getter = getter;
 }
 
-public Optional get(AggregatedMetric metric) {
-return Optional.ofNullable(metric).map(getter).filter(d -> !d.isNaN());
+public double get(AggregatedMetric metric) {
+if (metric != null) {
+return getter.apply(metric);
+} else {
+return Double.NaN;
+}

Review Comment:
   This change here could be the reason why the lag growth rate may be nan 
after this.
   Before this commit after your pending record related change the lag would be 
0 , and now this may return Double.nan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31362) Upgrade to Calcite version to 1.33.0

2023-03-07 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31362:
---
Component/s: Table SQL / API
 (was: Documentation)

> Upgrade to Calcite version to 1.33.0
> 
>
> Key: FLINK-31362
> URL: https://issues.apache.org/jira/browse/FLINK-31362
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Aitozi
>Priority: Major
>
> In Calcite 1.33.0, C-style escape strings have been supported. We should 
> outline its usage in document after upgrading to Calcite 1.33.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31362) Upgrade to Calcite version to 1.33.0

2023-03-07 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31362:
---
Summary: Upgrade to Calcite version to 1.33.0  (was: Add document about how 
to use C-style escape strings)

> Upgrade to Calcite version to 1.33.0
> 
>
> Key: FLINK-31362
> URL: https://issues.apache.org/jira/browse/FLINK-31362
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Aitozi
>Priority: Major
>
> In Calcite 1.33.0, C-style escape strings have been supported. We should 
> outline its usage in document after upgrading to Calcite 1.33.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31362) Add document about how to use C-style escape strings

2023-03-07 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31362:
---
Description: 
In Calcite 1.33.0, C-style escape strings have been supported. We should 
outline its usage in document after upgrading to Calcite 1.33.0


> Add document about how to use C-style escape strings
> 
>
> Key: FLINK-31362
> URL: https://issues.apache.org/jira/browse/FLINK-31362
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Aitozi
>Priority: Major
>
> In Calcite 1.33.0, C-style escape strings have been supported. We should 
> outline its usage in document after upgrading to Calcite 1.33.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31362) Add document about how to use C-style escape strings

2023-03-07 Thread Aitozi (Jira)
Aitozi created FLINK-31362:
--

 Summary: Add document about how to use C-style escape strings
 Key: FLINK-31362
 URL: https://issues.apache.org/jira/browse/FLINK-31362
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31361) job created by sql-client can't authenticate to kafka, can't find org.apache.kafka.common.security.plain.PlainLoginModule

2023-03-07 Thread David Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697645#comment-17697645
 ] 

David Anderson commented on FLINK-31361:


I'm using the sql-client to create the job.

> job created by sql-client can't authenticate to kafka, can't find 
> org.apache.kafka.common.security.plain.PlainLoginModule
> -
>
> Key: FLINK-31361
> URL: https://issues.apache.org/jira/browse/FLINK-31361
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: David Anderson
>Priority: Major
>
> I'm working with this SQL DDL:
> {noformat}
> CREATE TABLE pageviews_sink (
>   `url` STRING,
>   `user_id` STRING,
>   `browser` STRING,
>   `ts` TIMESTAMP_LTZ(3)
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'pageviews',
>   'properties.bootstrap.servers' = 'xxx.confluent.cloud:9092',
>   'properties.security.protocol'='SASL_SSL',
>   'properties.sasl.mechanism'='PLAIN',
>   
> 'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule
>  required username="xxx" password="xxx";',
>   'key.format' = 'json',
>   'key.fields' = 'url',
>   'value.format' = 'json'
> );
> {noformat}
> With {{flink-sql-connector-kafka-1.16.1.jar}} in the lib directory, this 
> fails with 
> {noformat}
> Caused by: javax.security.auth.login.LoginException: No LoginModule found for 
> org.apache.kafka.common.security.plain.PlainLoginModule{noformat}
> As a workaround I've found that it does work if I provide both
>  
> {{flink-connector-kafka-1.16.1.jar}}
> {{kafka-clients-3.2.3.jar}}
>  
> in the lib directory. It seems like the relocation applied in the SQL 
> connector isn't working properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31361) job created by sql-client can't authenticate to kafka, can't find org.apache.kafka.common.security.plain.PlainLoginModule

2023-03-07 Thread David Anderson (Jira)
David Anderson created FLINK-31361:
--

 Summary: job created by sql-client can't authenticate to kafka, 
can't find org.apache.kafka.common.security.plain.PlainLoginModule
 Key: FLINK-31361
 URL: https://issues.apache.org/jira/browse/FLINK-31361
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.16.1
Reporter: David Anderson


I'm working with this SQL DDL:
{noformat}
CREATE TABLE pageviews_sink (
  `url` STRING,
  `user_id` STRING,
  `browser` STRING,
  `ts` TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = 'xxx.confluent.cloud:9092',
  'properties.security.protocol'='SASL_SSL',
  'properties.sasl.mechanism'='PLAIN',
  
'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule
 required username="xxx" password="xxx";',
  'key.format' = 'json',
  'key.fields' = 'url',
  'value.format' = 'json'
);
{noformat}
With {{flink-sql-connector-kafka-1.16.1.jar}} in the lib directory, this fails 
with 
{noformat}
Caused by: javax.security.auth.login.LoginException: No LoginModule found for 
org.apache.kafka.common.security.plain.PlainLoginModule{noformat}
As a workaround I've found that it does work if I provide both
 
{{flink-connector-kafka-1.16.1.jar}}
{{kafka-clients-3.2.3.jar}}
 
in the lib directory. It seems like the relocation applied in the SQL connector 
isn't working properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30609) Add ephemeral storage to CRD

2023-03-07 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-30609:
--

Assignee: Prabcs  (was: Matyas Orhidi)

> Add ephemeral storage to CRD
> 
>
> Key: FLINK-30609
> URL: https://issues.apache.org/jira/browse/FLINK-30609
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Matyas Orhidi
>Assignee: Prabcs
>Priority: Major
>  Labels: starter
> Fix For: kubernetes-operator-1.5.0
>
>
> We should consider adding ephemeral storage to the existing [resource 
> specification 
> |https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#resource]in
>  CRD, next to {{cpu}} and {{memory}}
> https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#setting-requests-and-limits-for-local-ephemeral-storage



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30609) Add ephemeral storage to CRD

2023-03-07 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697635#comment-17697635
 ] 

Gyula Fora commented on FLINK-30609:


[~morhidi] do you mind if [~pbharaj] takes over this ticket?

> Add ephemeral storage to CRD
> 
>
> Key: FLINK-30609
> URL: https://issues.apache.org/jira/browse/FLINK-30609
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Matyas Orhidi
>Assignee: Matyas Orhidi
>Priority: Major
>  Labels: starter
> Fix For: kubernetes-operator-1.5.0
>
>
> We should consider adding ephemeral storage to the existing [resource 
> specification 
> |https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#resource]in
>  CRD, next to {{cpu}} and {{memory}}
> https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#setting-requests-and-limits-for-local-ephemeral-storage



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >