[jira] [Comment Edited] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-03-08 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17698227#comment-17698227
 ] 

Yue Ma edited comment on FLINK-31238 at 3/9/23 7:55 AM:


[~yunta] Thanks for replying ~

Yes, in the current implementation both Clip and Ingest happen in 
#initialization phase . But compared to the previous way, its blocking time 
will be much reduced.

I have also thought about asynchronous recovery during rescaling. For example, 
delete data that does not belong to the Task KeyGroup through a Special 
Compaction, and ensure that the compaction can be executed at the first 
checkpoint.  Or make some special marks for invalid data like DeteleRange and 
then clean it up asynchronously.

Considering that most of our online jobs use rocksdb statebackend , we need to 
support these feature on rocksdb. It seems that some of other solutions will 
bring more changes to rocksdb. Both consider the risks and benefits,  we chose 
the above method


was (Author: mayuehappy):
[~yunta] Thanks for replying ~

Yes, in the current implementation both Clip and Ingest happen in 
#initialization phase . But compared to the previous rescaling recovery method, 
its blocking time will be much reduced.

I also thought about asynchronous recovery during rescaling. For example, 
delete data that does not belong to the Task KeyGroup through a Special 
Compaction, and ensure that the compaction can be executed at the first 
checkpoint.  Or make some special marks for invalid data like DeteleRange and 
then clean it up asynchronously.

Considering that most of our online jobs use rocksdb statebackend , we need to 
support these feature on rocksdb. It seems that some of other solutions will 
bring more changes to rocksdb. Both consider the risks and benefits,  we chose 
the above method

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png, 
> image-2023-03-09-15-28-32-363.png, image-2023-03-09-15-41-03-074.png, 
> image-2023-03-09-15-41-08-379.png, image-2023-03-09-15-45-56-081.png, 
> image-2023-03-09-15-46-01-176.png, image-2023-03-09-15-50-04-281.png
>
>
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the 

[GitHub] [flink] zhuzhurk commented on a diff in pull request #22098: [FLINK-31144][coordination] Modify the judgment logic of whether to ignore the input locations of a ConsumePartitionGroup if the c

2023-03-08 Thread via GitHub


zhuzhurk commented on code in PR #22098:
URL: https://github.com/apache/flink/pull/22098#discussion_r1130607703


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultPreferredLocationsRetrieverTest.java:
##
@@ -81,16 +81,6 @@ void testInputLocations() {
 producerLocations,
 Collections.emptySet());
 }

Review Comment:
   This `{}`pair is no longer needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-03-08 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17698227#comment-17698227
 ] 

Yue Ma commented on FLINK-31238:


[~yunta] Thanks for replying ~

Yes, in the current implementation both Clip and Ingest happen in 
#initialization phase . But compared to the previous rescaling recovery method, 
its blocking time will be much reduced.

I also thought about asynchronous recovery during rescaling. For example, 
delete data that does not belong to the Task KeyGroup through a Special 
Compaction, and ensure that the compaction can be executed at the first 
checkpoint.  Or make some special marks for invalid data like DeteleRange and 
then clean it up asynchronously.

Considering that most of our online jobs use rocksdb statebackend , we need to 
support these feature on rocksdb. It seems that some of other solutions will 
bring more changes to rocksdb. Both consider the risks and benefits,  we chose 
the above method

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png, 
> image-2023-03-09-15-28-32-363.png, image-2023-03-09-15-41-03-074.png, 
> image-2023-03-09-15-41-08-379.png, image-2023-03-09-15-45-56-081.png, 
> image-2023-03-09-15-46-01-176.png, image-2023-03-09-15-50-04-281.png
>
>
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !image-2023-02-27-16-57-18-435.png|width=434,height=152!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 

[jira] [Updated] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-03-08 Thread Yue Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-31238:
---
Attachment: image-2023-03-09-15-50-04-281.png

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png, 
> image-2023-03-09-15-28-32-363.png, image-2023-03-09-15-41-03-074.png, 
> image-2023-03-09-15-41-08-379.png, image-2023-03-09-15-45-56-081.png, 
> image-2023-03-09-15-46-01-176.png, image-2023-03-09-15-50-04-281.png
>
>
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !image-2023-02-27-16-57-18-435.png|width=434,height=152!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 times as follows 
> (SstFileWriter means uses the recovery method of generating sst files through 
> SstFileWriter in parallel)
>  * parallelism changes from 4 to 2
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.018 s/op
> Iteration 2: 9.551 s/op
> Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff}Iteration 1: 3.922 s/op{color}
> {color:#ff}Iteration 2: 3.208 s/op{color}
> {color:#ff}Iteration 3: 3.096 s/op{color}|
> |1G|Iteration 1: 19.686 s/op
> Iteration 2: 19.402 s/op
> Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op
> Iteration 2: 16.933 s/op
> Iteration 3: 15.486 s/op|{color:#ff}Iteration 1: 6.207 s/op{color}
> {color:#ff}Iteration 2: 7.164 s/op{color}
> 

[jira] [Updated] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-03-08 Thread Yue Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-31238:
---
Attachment: image-2023-03-09-15-46-01-176.png

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png, 
> image-2023-03-09-15-28-32-363.png, image-2023-03-09-15-41-03-074.png, 
> image-2023-03-09-15-41-08-379.png, image-2023-03-09-15-45-56-081.png, 
> image-2023-03-09-15-46-01-176.png
>
>
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !image-2023-02-27-16-57-18-435.png|width=434,height=152!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 times as follows 
> (SstFileWriter means uses the recovery method of generating sst files through 
> SstFileWriter in parallel)
>  * parallelism changes from 4 to 2
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.018 s/op
> Iteration 2: 9.551 s/op
> Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff}Iteration 1: 3.922 s/op{color}
> {color:#ff}Iteration 2: 3.208 s/op{color}
> {color:#ff}Iteration 3: 3.096 s/op{color}|
> |1G|Iteration 1: 19.686 s/op
> Iteration 2: 19.402 s/op
> Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op
> Iteration 2: 16.933 s/op
> Iteration 3: 15.486 s/op|{color:#ff}Iteration 1: 6.207 s/op{color}
> {color:#ff}Iteration 2: 7.164 s/op{color}
> {color:#ff}Iteration 3: 6.397 

[jira] [Updated] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-03-08 Thread Yue Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-31238:
---
Attachment: image-2023-03-09-15-45-56-081.png

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png, 
> image-2023-03-09-15-28-32-363.png, image-2023-03-09-15-41-03-074.png, 
> image-2023-03-09-15-41-08-379.png, image-2023-03-09-15-45-56-081.png
>
>
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !image-2023-02-27-16-57-18-435.png|width=434,height=152!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 times as follows 
> (SstFileWriter means uses the recovery method of generating sst files through 
> SstFileWriter in parallel)
>  * parallelism changes from 4 to 2
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.018 s/op
> Iteration 2: 9.551 s/op
> Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff}Iteration 1: 3.922 s/op{color}
> {color:#ff}Iteration 2: 3.208 s/op{color}
> {color:#ff}Iteration 3: 3.096 s/op{color}|
> |1G|Iteration 1: 19.686 s/op
> Iteration 2: 19.402 s/op
> Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op
> Iteration 2: 16.933 s/op
> Iteration 3: 15.486 s/op|{color:#ff}Iteration 1: 6.207 s/op{color}
> {color:#ff}Iteration 2: 7.164 s/op{color}
> {color:#ff}Iteration 3: 6.397 s/op{color}|
> |5G|Iteration 1: 244.795 s/op
> 

[GitHub] [flink] flinkbot commented on pull request #22136: [WIP][table] Support LATERAL/JOIN LATERAL temporal table join

2023-03-08 Thread via GitHub


flinkbot commented on PR #22136:
URL: https://github.com/apache/flink/pull/22136#issuecomment-1461493973

   
   ## CI report:
   
   * c7160f860d0960a5186c3472dbb6bd3d22dbdf71 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-03-08 Thread Yue Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-31238:
---
Attachment: image-2023-03-09-15-41-08-379.png

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png, 
> image-2023-03-09-15-28-32-363.png, image-2023-03-09-15-41-03-074.png, 
> image-2023-03-09-15-41-08-379.png
>
>
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !image-2023-02-27-16-57-18-435.png|width=434,height=152!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 times as follows 
> (SstFileWriter means uses the recovery method of generating sst files through 
> SstFileWriter in parallel)
>  * parallelism changes from 4 to 2
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.018 s/op
> Iteration 2: 9.551 s/op
> Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff}Iteration 1: 3.922 s/op{color}
> {color:#ff}Iteration 2: 3.208 s/op{color}
> {color:#ff}Iteration 3: 3.096 s/op{color}|
> |1G|Iteration 1: 19.686 s/op
> Iteration 2: 19.402 s/op
> Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op
> Iteration 2: 16.933 s/op
> Iteration 3: 15.486 s/op|{color:#ff}Iteration 1: 6.207 s/op{color}
> {color:#ff}Iteration 2: 7.164 s/op{color}
> {color:#ff}Iteration 3: 6.397 s/op{color}|
> |5G|Iteration 1: 244.795 s/op
> Iteration 2: 243.141 s/op
> 

[jira] [Updated] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-03-08 Thread Yue Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-31238:
---
Attachment: image-2023-03-09-15-41-03-074.png

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png, 
> image-2023-03-09-15-28-32-363.png, image-2023-03-09-15-41-03-074.png, 
> image-2023-03-09-15-41-08-379.png
>
>
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !image-2023-02-27-16-57-18-435.png|width=434,height=152!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 times as follows 
> (SstFileWriter means uses the recovery method of generating sst files through 
> SstFileWriter in parallel)
>  * parallelism changes from 4 to 2
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.018 s/op
> Iteration 2: 9.551 s/op
> Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff}Iteration 1: 3.922 s/op{color}
> {color:#ff}Iteration 2: 3.208 s/op{color}
> {color:#ff}Iteration 3: 3.096 s/op{color}|
> |1G|Iteration 1: 19.686 s/op
> Iteration 2: 19.402 s/op
> Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op
> Iteration 2: 16.933 s/op
> Iteration 3: 15.486 s/op|{color:#ff}Iteration 1: 6.207 s/op{color}
> {color:#ff}Iteration 2: 7.164 s/op{color}
> {color:#ff}Iteration 3: 6.397 s/op{color}|
> |5G|Iteration 1: 244.795 s/op
> Iteration 2: 243.141 s/op
> 

[jira] [Created] (FLINK-31376) CSVReader for streaming does not support splittable

2023-03-08 Thread ramkrishna.s.vasudevan (Jira)
ramkrishna.s.vasudevan created FLINK-31376:
--

 Summary: CSVReader for streaming does not support splittable
 Key: FLINK-31376
 URL: https://issues.apache.org/jira/browse/FLINK-31376
 Project: Flink
  Issue Type: Improvement
Reporter: ramkrishna.s.vasudevan


Using CsvReaderFormat, when we create the StreamFormatAdapter it will not 
support 'splittable'. This task is targetted towards supporting file splits 
while we create the FileSource over a CSV file. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] cshuo opened a new pull request, #22136: [FLINK-XXXX][table] Support LATERAL/JOIN LATERAL temporal table join

2023-03-08 Thread via GitHub


cshuo opened a new pull request, #22136:
URL: https://github.com/apache/flink/pull/22136

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-03-08 Thread Yue Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-31238:
---
Attachment: image-2023-03-09-15-28-32-363.png

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png, 
> image-2023-03-09-15-28-32-363.png
>
>
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !image-2023-02-27-16-57-18-435.png|width=434,height=152!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 times as follows 
> (SstFileWriter means uses the recovery method of generating sst files through 
> SstFileWriter in parallel)
>  * parallelism changes from 4 to 2
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.018 s/op
> Iteration 2: 9.551 s/op
> Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff}Iteration 1: 3.922 s/op{color}
> {color:#ff}Iteration 2: 3.208 s/op{color}
> {color:#ff}Iteration 3: 3.096 s/op{color}|
> |1G|Iteration 1: 19.686 s/op
> Iteration 2: 19.402 s/op
> Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op
> Iteration 2: 16.933 s/op
> Iteration 3: 15.486 s/op|{color:#ff}Iteration 1: 6.207 s/op{color}
> {color:#ff}Iteration 2: 7.164 s/op{color}
> {color:#ff}Iteration 3: 6.397 s/op{color}|
> |5G|Iteration 1: 244.795 s/op
> Iteration 2: 243.141 s/op
> Iteration 3: 253.542 s/op|Iteration 1: 78.058 s/op
> Iteration 2: 85.635 s/op
> 

[jira] [Updated] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-03-08 Thread Yue Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-31238:
---
Attachment: image-2023-03-09-15-26-12-314.png

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png
>
>
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !image-2023-02-27-16-57-18-435.png|width=434,height=152!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 times as follows 
> (SstFileWriter means uses the recovery method of generating sst files through 
> SstFileWriter in parallel)
>  * parallelism changes from 4 to 2
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.018 s/op
> Iteration 2: 9.551 s/op
> Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff}Iteration 1: 3.922 s/op{color}
> {color:#ff}Iteration 2: 3.208 s/op{color}
> {color:#ff}Iteration 3: 3.096 s/op{color}|
> |1G|Iteration 1: 19.686 s/op
> Iteration 2: 19.402 s/op
> Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op
> Iteration 2: 16.933 s/op
> Iteration 3: 15.486 s/op|{color:#ff}Iteration 1: 6.207 s/op{color}
> {color:#ff}Iteration 2: 7.164 s/op{color}
> {color:#ff}Iteration 3: 6.397 s/op{color}|
> |5G|Iteration 1: 244.795 s/op
> Iteration 2: 243.141 s/op
> Iteration 3: 253.542 s/op|Iteration 1: 78.058 s/op
> Iteration 2: 85.635 s/op
> Iteration 3: 76.568 

[jira] [Commented] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-03-08 Thread Yue Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17698215#comment-17698215
 ] 

Yue Ma commented on FLINK-31238:


[~masteryhx]  Thanks for the reply, I understand your concerns. It's a good 
suggestion to push this to rocksdb community

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png
>
>
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !image-2023-02-27-16-57-18-435.png|width=434,height=152!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 times as follows 
> (SstFileWriter means uses the recovery method of generating sst files through 
> SstFileWriter in parallel)
>  * parallelism changes from 4 to 2
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.018 s/op
> Iteration 2: 9.551 s/op
> Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff}Iteration 1: 3.922 s/op{color}
> {color:#ff}Iteration 2: 3.208 s/op{color}
> {color:#ff}Iteration 3: 3.096 s/op{color}|
> |1G|Iteration 1: 19.686 s/op
> Iteration 2: 19.402 s/op
> Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op
> Iteration 2: 16.933 s/op
> Iteration 3: 15.486 s/op|{color:#ff}Iteration 1: 6.207 s/op{color}
> {color:#ff}Iteration 2: 7.164 s/op{color}
> {color:#ff}Iteration 3: 6.397 s/op{color}|
> |5G|Iteration 1: 244.795 s/op
> Iteration 2: 243.141 s/op
> Iteration 3: 253.542 s/op|Iteration 1: 

[jira] [Updated] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2023-03-08 Thread Yue Ma (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-31238:
---
Attachment: image-2023-03-09-15-23-30-581.png

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png
>
>
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !image-2023-02-27-16-57-18-435.png|width=434,height=152!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 times as follows 
> (SstFileWriter means uses the recovery method of generating sst files through 
> SstFileWriter in parallel)
>  * parallelism changes from 4 to 2
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.018 s/op
> Iteration 2: 9.551 s/op
> Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff}Iteration 1: 3.922 s/op{color}
> {color:#ff}Iteration 2: 3.208 s/op{color}
> {color:#ff}Iteration 3: 3.096 s/op{color}|
> |1G|Iteration 1: 19.686 s/op
> Iteration 2: 19.402 s/op
> Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op
> Iteration 2: 16.933 s/op
> Iteration 3: 15.486 s/op|{color:#ff}Iteration 1: 6.207 s/op{color}
> {color:#ff}Iteration 2: 7.164 s/op{color}
> {color:#ff}Iteration 3: 6.397 s/op{color}|
> |5G|Iteration 1: 244.795 s/op
> Iteration 2: 243.141 s/op
> Iteration 3: 253.542 s/op|Iteration 1: 78.058 s/op
> Iteration 2: 85.635 s/op
> Iteration 3: 76.568 s/op|{color:#ff}Iteration 1: 23.397 s/op{color}
> 

[GitHub] [flink] PatrickRen commented on pull request #22100: [FLINK-31208][Connectors / Kafka] KafkaSourceReader overrides meaning…

2023-03-08 Thread via GitHub


PatrickRen commented on PR #22100:
URL: https://github.com/apache/flink/pull/22100#issuecomment-1461451782

   Thanks for the patch @loserwang1024 ! LGTM. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-31240) Reduce the overhead of conversion between DataStream and Table

2023-03-08 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned FLINK-31240:


Assignee: Jiang Xin

> Reduce the overhead of conversion between DataStream and Table
> --
>
> Key: FLINK-31240
> URL: https://issues.apache.org/jira/browse/FLINK-31240
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Jiang Xin
>Assignee: Jiang Xin
>Priority: Major
>  Labels: pull-request-available
>
> In some cases, users may need to convert the underlying DataStream to Table 
> and then convert it back to DataStream(e.g. some Flink ML libraries accept a 
> Table as input and convert it to DataStream for calculation.). This would 
> cause unnecessary overhead because of data conversion between the internal 
> data type and the external data type.
> We can reduce the overhead by checking if there are paired 
> `fromDataStream`/`toDataStream` function call without any transformation, if 
> so using the source datastream directly.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31375) the values of map are truncated by the CASE WHEN function.

2023-03-08 Thread jeff-zou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jeff-zou updated FLINK-31375:
-
Description: 
the values of map are truncated by the CASE WHEN function.
{code:java}
// sql
create table test (a map) with ('connector'='print');
insert into test  select * from (values(case when true then 
map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
end));{code}
the result:
{code:java}
+I[{test=123}] {code}
We hope the value of result is '123456789', but I get '123', the length is 
limited by 'abc'.

 

  was:
the values of map are truncated by the CASE WHEN function.
{code:java}
// sql
create table test (a map) with ('connector'='print');
insert into test  select * from (values(case when true then 
map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
end));{code}
the result:
{code:java}
+I[{test=123}] {code}
We hope the value of result is '123456789', but I get '123'  which the length 
is limited by 'abc'.

 


> the values of map are truncated by the CASE WHEN function.
> -
>
> Key: FLINK-31375
> URL: https://issues.apache.org/jira/browse/FLINK-31375
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: jeff-zou
>Priority: Major
>
> the values of map are truncated by the CASE WHEN function.
> {code:java}
> // sql
> create table test (a map) with ('connector'='print');
> insert into test  select * from (values(case when true then 
> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
> end));{code}
> the result:
> {code:java}
> +I[{test=123}] {code}
> We hope the value of result is '123456789', but I get '123', the length is 
> limited by 'abc'.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31375) the values of map are truncated by the CASE WHEN function.

2023-03-08 Thread jeff-zou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jeff-zou updated FLINK-31375:
-
Description: 
the values of map are truncated by the CASE WHEN function.
{code:java}
// sql
create table test (a map) with ('connector'='print');
insert into test  select * from (values(case when true then 
map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
end));{code}
the result:
{code:java}
+I[{test=123}] {code}
We hope the value of result is '123456789', but I get '123'  which the length 
is limited by 'abc'.

 

  was:
the values of map are truncated by the CASE WHEN function.
{code:java}
// sql
create table test (a map) with ('connector'='print');
insert into test  select * from (values(case when true then 
map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
end));{code}
the result:
{code:java}
+I[{test=123}] {code}
We hope the value of result is '123456789', but I get '123' .

 


> the values of map are truncated by the CASE WHEN function.
> -
>
> Key: FLINK-31375
> URL: https://issues.apache.org/jira/browse/FLINK-31375
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: jeff-zou
>Priority: Major
>
> the values of map are truncated by the CASE WHEN function.
> {code:java}
> // sql
> create table test (a map) with ('connector'='print');
> insert into test  select * from (values(case when true then 
> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
> end));{code}
> the result:
> {code:java}
> +I[{test=123}] {code}
> We hope the value of result is '123456789', but I get '123'  which the length 
> is limited by 'abc'.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31375) the values of map are truncated by the CASE WHEN function.

2023-03-08 Thread jeff-zou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jeff-zou updated FLINK-31375:
-
Description: 
the values of map are truncated by the CASE WHEN function.
{code:java}
// sql
create table test (a map) with ('connector'='print');
insert into test  select * from (values(case when true then 
map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
end));{code}
the result:
{code:java}
+I[{test=123}] {code}
We hope the value of result is '123456789', but I get '123' .

 

  was:
the values of map are truncated by the CASE WHEN function.
{code:java}
// sql
create table test (a map) with ('connector'='print');
insert into test  select * from (values(case when true then 
map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
end));{code}
the result:
{code:java}
+I[{test=123}] {code}
We hope the value of result is '123456789', but I get '123'.

 


> the values of map are truncated by the CASE WHEN function.
> -
>
> Key: FLINK-31375
> URL: https://issues.apache.org/jira/browse/FLINK-31375
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: jeff-zou
>Priority: Major
>
> the values of map are truncated by the CASE WHEN function.
> {code:java}
> // sql
> create table test (a map) with ('connector'='print');
> insert into test  select * from (values(case when true then 
> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
> end));{code}
> the result:
> {code:java}
> +I[{test=123}] {code}
> We hope the value of result is '123456789', but I get '123' .
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31375) the values of map are truncated by the CASE WHEN function.

2023-03-08 Thread jeff-zou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jeff-zou updated FLINK-31375:
-
Description: 
the values of map are truncated by the CASE WHEN function.
{code:java}
// sql
create table test (a map) with ('connector'='print');
insert into test  select * from (values(case when true then 
map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
end));{code}
the result:
{code:java}
+I[{test=123}] {code}
We hope the value of result is '123456789', but I get '123'.

 

  was:
the values of map are truncated by the CASE WHEN function.
{code:java}
// sql
create table test (a map) with ('connector'='print');
insert into test  select * from (values(case when true then 
map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
end));{code}
the result:
{code:java}
+I[{test=123}] {code}
We hope the value of result if '123456789', but I get '123'.

 


> the values of map are truncated by the CASE WHEN function.
> -
>
> Key: FLINK-31375
> URL: https://issues.apache.org/jira/browse/FLINK-31375
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: jeff-zou
>Priority: Major
>
> the values of map are truncated by the CASE WHEN function.
> {code:java}
> // sql
> create table test (a map) with ('connector'='print');
> insert into test  select * from (values(case when true then 
> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
> end));{code}
> the result:
> {code:java}
> +I[{test=123}] {code}
> We hope the value of result is '123456789', but I get '123'.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31375) the values of map are truncated by the CASE WHEN function.

2023-03-08 Thread jeff-zou (Jira)
jeff-zou created FLINK-31375:


 Summary: the values of map are truncated by the 
CASE WHEN function.
 Key: FLINK-31375
 URL: https://issues.apache.org/jira/browse/FLINK-31375
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.15.1
Reporter: jeff-zou


the values of map are truncated by the CASE WHEN function.
{code:java}
// sql
create table test (a map) with ('connector'='print');
insert into test  select * from (values(case when true then 
map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc'] 
end));{code}
the result:
{code:java}
+I[{test=123}] {code}
We hope the value of result if '123456789', but I get '123'.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-08 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1130546283


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java:
##
@@ -611,7 +611,9 @@ private void addResolvedColumns(List columns) {
 } else if (c instanceof ComputedColumn) {
 final ComputedColumn computedColumn = 
(ComputedColumn) c;
 columnByExpression(
-computedColumn.getName(), 
computedColumn.getExpression());
+computedColumn.getName(),
+new SqlCallExpression(
+
computedColumn.getExpression().asSerializableString()));

Review Comment:
   Here, I modify the behavior by converting the resolvedExpression to an 
unresolvedExpression when using 
`Schema.newBuilder().fromResolvedSchema().build()`. This modification 
simplifies schema comparison during diff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on pull request #21452: [FLINK-30282] Fix Logical type ROW lost inner field's nullability after converting to RelDataType

2023-03-08 Thread via GitHub


LadyForest commented on PR #21452:
URL: https://github.com/apache/flink/pull/21452#issuecomment-1461375359

   One concern is that for a nullable row type, does it make sense to let the 
inner field not be null?
   In other words, should we not support `ROW`? 
   WDYT @snuyanzin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27018) timestamp missing end zero when outputing to kafka

2023-03-08 Thread jeff-zou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jeff-zou updated FLINK-27018:
-
Description: 
the bug is described as follows:

 
{code:java}
data in source:
 2022-04-02 03:34:21.260
but after sink by sql, the data in kafka:
 2022-04-02 03:34:21.26
{code}
 

data miss end zero in kafka.

 

sql:
{code:java}
create kafka_table(stime stimestamp) with ('connector'='kafka','format' = 
'json');
insert into kafka_table select stime from (values(timestamp '2022-04-02 
03:34:21.260')){code}
the value in kafka is : \{"stime":"2022-04-02 03:34:21.26"}, missed end zero.

  was:
the bug is described as follows:

 
{code:java}
data in source:
 2022-04-02 03:34:21.260
but after sink by sql, data in kafka:
 2022-04-02 03:34:21.26
{code}
 

data miss end zero in kafka.

 

sql:
{code:java}
create kafka_table(stime stimestamp) with ('connector'='kafka','format' = 
'json');
insert into kafka_table select stime from (values(timestamp '2022-04-02 
03:34:21.260')){code}
the value in kafka is : \{"stime":"2022-04-02 03:34:21.26"}, missed end zero.


> timestamp missing end  zero when outputing to kafka
> ---
>
> Key: FLINK-27018
> URL: https://issues.apache.org/jira/browse/FLINK-27018
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.5
>Reporter: jeff-zou
>Priority: Major
> Attachments: kafka.png
>
>
> the bug is described as follows:
>  
> {code:java}
> data in source:
>  2022-04-02 03:34:21.260
> but after sink by sql, the data in kafka:
>  2022-04-02 03:34:21.26
> {code}
>  
> data miss end zero in kafka.
>  
> sql:
> {code:java}
> create kafka_table(stime stimestamp) with ('connector'='kafka','format' = 
> 'json');
> insert into kafka_table select stime from (values(timestamp '2022-04-02 
> 03:34:21.260')){code}
> the value in kafka is : \{"stime":"2022-04-02 03:34:21.26"}, missed end zero.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] felixzh2020 commented on pull request #22118: [FLINK-31335][sql-gateway] When submit job to yarn, support kerberos

2023-03-08 Thread via GitHub


felixzh2020 commented on PR #22118:
URL: https://github.com/apache/flink/pull/22118#issuecomment-1461344401

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30513) HA storage dir leaks on cluster termination

2023-03-08 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17698192#comment-17698192
 ] 

Weijie Guo commented on FLINK-30513:


master(1.18) via 13779ab8e4f5539ca311d9f233d031d818af6450.

> HA storage dir leaks on cluster termination 
> 
>
> Key: FLINK-30513
> URL: https://issues.apache.org/jira/browse/FLINK-30513
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-12-27-21-32-17-510.png
>
>
> *Problem*
> We found that HA storage dir leaks on cluster termination for a Flink job 
> with HA enabled. The following picture shows the HA storage dir (here on 
> HDFS) of the cluster czh-flink-test-offline (of application mode) after 
> canelling the job with flink-cancel. We are left with an empty dir, and too 
> many empty dirs will greatly hurt the stability of HDFS NameNode!
> !image-2022-12-27-21-32-17-510.png|width=582,height=158!
>  
> Furthermore, in case the user choose to retain the checkpoints on job 
> termination, we will have the completedCheckpoints leaked as well. Note that 
> we no longer need the completedCheckpoints files as we'll directly recover 
> retained CPs from the CP data dir.
> *Root Cause*
> When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob 
> store, but didn't clean the HA storage dir.
> *Proposal*
> Clean up the HA storage dir after cleaning up blob store in 
> AbstractHaServices#closeAndCleanupAllData().



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30513) HA storage dir leaks on cluster termination

2023-03-08 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17698192#comment-17698192
 ] 

Weijie Guo edited comment on FLINK-30513 at 3/9/23 6:08 AM:


master(1.18) via 13779ab8e4f5539ca311d9f233d031d818af6450.
release-1.17 waiting for CI.


was (Author: weijie guo):
master(1.18) via 13779ab8e4f5539ca311d9f233d031d818af6450.

> HA storage dir leaks on cluster termination 
> 
>
> Key: FLINK-30513
> URL: https://issues.apache.org/jira/browse/FLINK-30513
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-12-27-21-32-17-510.png
>
>
> *Problem*
> We found that HA storage dir leaks on cluster termination for a Flink job 
> with HA enabled. The following picture shows the HA storage dir (here on 
> HDFS) of the cluster czh-flink-test-offline (of application mode) after 
> canelling the job with flink-cancel. We are left with an empty dir, and too 
> many empty dirs will greatly hurt the stability of HDFS NameNode!
> !image-2022-12-27-21-32-17-510.png|width=582,height=158!
>  
> Furthermore, in case the user choose to retain the checkpoints on job 
> termination, we will have the completedCheckpoints leaked as well. Note that 
> we no longer need the completedCheckpoints files as we'll directly recover 
> retained CPs from the CP data dir.
> *Root Cause*
> When we run AbstractHaServices#closeAndCleanupAllData(), we cleaned up blob 
> store, but didn't clean the HA storage dir.
> *Proposal*
> Clean up the HA storage dir after cleaning up blob store in 
> AbstractHaServices#closeAndCleanupAllData().



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on pull request #21673: [FLINK-30513] Cleanup HA storage path on cluster termination

2023-03-08 Thread via GitHub


reswqa commented on PR #21673:
URL: https://github.com/apache/flink/pull/21673#issuecomment-1461341895

   @X-czh merged. Could you open a backport pr to release-1.17?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa closed pull request #21673: [FLINK-30513] Cleanup HA storage path on cluster termination

2023-03-08 Thread via GitHub


reswqa closed pull request #21673: [FLINK-30513] Cleanup HA storage path on 
cluster termination
URL: https://github.com/apache/flink/pull/21673


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31298) ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException

2023-03-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-31298:
---
Fix Version/s: 1.17.0
   1.16.2

> ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows 
> IllegalArgumentException
> -
>
> Key: FLINK-31298
> URL: https://issues.apache.org/jira/browse/FLINK-31298
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Matthias Pohl
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available, starter, test-stability
> Fix For: 1.17.0, 1.16.2
>
>
> FLINK-24156 introduced {{NetUtils.acceptWithoutTimeout}} which caused the 
> test to print a the stacktrace of an {{IllegalArgumentException}}:
> {code}
> Exception in thread "Thread-0" java.lang.IllegalArgumentException: 
> serverSocket SO_TIMEOUT option must be 0
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>   at 
> org.apache.flink.util.NetUtils.acceptWithoutTimeout(NetUtils.java:139)
>   at 
> org.apache.flink.runtime.net.ConnectionUtilsTest$1.run(ConnectionUtilsTest.java:83)
>   at java.lang.Thread.run(Thread.java:750)
> {code}
> This is also shown in the Maven output of CI runs and might cause confusion. 
> The test should be fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31298) ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException

2023-03-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-31298.
--
Resolution: Fixed

> ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows 
> IllegalArgumentException
> -
>
> Key: FLINK-31298
> URL: https://issues.apache.org/jira/browse/FLINK-31298
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Matthias Pohl
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available, starter, test-stability
>
> FLINK-24156 introduced {{NetUtils.acceptWithoutTimeout}} which caused the 
> test to print a the stacktrace of an {{IllegalArgumentException}}:
> {code}
> Exception in thread "Thread-0" java.lang.IllegalArgumentException: 
> serverSocket SO_TIMEOUT option must be 0
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>   at 
> org.apache.flink.util.NetUtils.acceptWithoutTimeout(NetUtils.java:139)
>   at 
> org.apache.flink.runtime.net.ConnectionUtilsTest$1.run(ConnectionUtilsTest.java:83)
>   at java.lang.Thread.run(Thread.java:750)
> {code}
> This is also shown in the Maven output of CI runs and might cause confusion. 
> The test should be fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31298) ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows IllegalArgumentException

2023-03-08 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697743#comment-17697743
 ] 

Weijie Guo edited comment on FLINK-31298 at 3/9/23 6:00 AM:


master(1.18) via 7c5b7be5bc165a9799f10b5761a6ff15edee43b6.
release-1.17 via 704076a36024d521957e4e2f31820bbad7a102b3.
release-1.16 via b7b1cced495e29075adda10496238f251fe74d53.


was (Author: weijie guo):
master(1.18) via 7c5b7be5bc165a9799f10b5761a6ff15edee43b6.
release-1.15 waiting for CI.
release-1.16 waiting for CI.

> ConnectionUtilsTest.testFindConnectingAddressWhenGetLocalHostThrows swallows 
> IllegalArgumentException
> -
>
> Key: FLINK-31298
> URL: https://issues.apache.org/jira/browse/FLINK-31298
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Matthias Pohl
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available, starter, test-stability
>
> FLINK-24156 introduced {{NetUtils.acceptWithoutTimeout}} which caused the 
> test to print a the stacktrace of an {{IllegalArgumentException}}:
> {code}
> Exception in thread "Thread-0" java.lang.IllegalArgumentException: 
> serverSocket SO_TIMEOUT option must be 0
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>   at 
> org.apache.flink.util.NetUtils.acceptWithoutTimeout(NetUtils.java:139)
>   at 
> org.apache.flink.runtime.net.ConnectionUtilsTest$1.run(ConnectionUtilsTest.java:83)
>   at java.lang.Thread.run(Thread.java:750)
> {code}
> This is also shown in the Maven output of CI runs and might cause confusion. 
> The test should be fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa merged pull request #22129: [FLINK-31298] backport to branch release-1.17

2023-03-08 Thread via GitHub


reswqa merged PR #22129:
URL: https://github.com/apache/flink/pull/22129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa merged pull request #22128: [FLINK-31298] backport to branch release-1.16

2023-03-08 Thread via GitHub


reswqa merged PR #22128:
URL: https://github.com/apache/flink/pull/22128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lightzhao commented on pull request #22126: [Bug][FLINK-31363]KafkaSink failed to commit transactions under EXACTLY_ONCE semantics.

2023-03-08 Thread via GitHub


lightzhao commented on PR #22126:
URL: https://github.com/apache/flink/pull/22126#issuecomment-1461330269

   @fapaul @AHeise PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

2023-03-08 Thread via GitHub


lindong28 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1130375623


##
flink-ml-core/src/test/java/org/apache/flink/ml/api/ExampleStages.java:
##
@@ -110,6 +111,10 @@ public static SumModel load(StreamTableEnvironment tEnv, 
String path) throws IOE
 SumModel model = ReadWriteUtils.loadStageParam(path);
 return model.setModelData(modelDataTable);
 }
+
+public static SumModelServable loadServable(String path) throws 
IOException {

Review Comment:
   Can we add a test this method directly (rather than via reflection) similar 
to the existing `ReadWriteUtilsTest#testModelSaveLoad`? This would allow 
developers to quickly identify where this method is tested via IDE. 
   
   And it is also more consistent with the idea that we want to test every 
public API in the same way as how users are going to use it.



##
flink-ml-servable-core/src/test/java/org/apache/flink/ml/servable/builder/ExampleServables.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.servable.builder;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.ModelServable;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.api.TransformerServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.ServableReadWriteUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Defines Servable subclasses to be used in unit tests. */
+public class ExampleServables {
+
+/**
+ * A {@link TransformerServable} subclass that increments every value in 
the input dataframe by
+ * `delta` and outputs the resulting values.
+ */
+public static class SumModelServable implements 
ModelServable {
+
+private static final String COL_NAME = "input";
+
+private final Map, Object> paramMap = new HashMap<>();
+
+private int delta;
+
+public SumModelServable() {
+ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+}
+
+@Override
+public DataFrame transform(DataFrame input) {
+List outputRows = new ArrayList<>();
+for (Row row : input.collect()) {
+assert row.size() == 1;
+int originValue = (Integer) row.get(0);
+outputRows.add(new Row(Collections.singletonList(originValue + 
delta)));
+}
+return new DataFrame(
+Collections.singletonList(COL_NAME),
+Collections.singletonList(DataTypes.INT),
+outputRows);
+}
+
+@Override
+public Map, Object> getParamMap() {
+return paramMap;
+}
+
+public static SumModelServable load(String path) throws IOException {
+SumModelServable servable =
+ServableReadWriteUtils.loadServableParam(path, 
SumModelServable.class);
+
+try (InputStream inputStream = 
ServableReadWriteUtils.loadModelData(path)) {
+DataInputViewStreamWrapper dataInputViewStreamWrapper =
+new DataInputViewStreamWrapper(inputStream);
+int delta = 
IntSerializer.INSTANCE.deserialize(dataInputViewStreamWrapper);

Review Comment:
   Would it be simpler to do the following and remove the method 
`setModelData()`?
   
   ```
   servable.delta = 
IntSerializer.INSTANCE.deserialize(dataInputViewStreamWrapper);
   return servable;
   ```



##
flink-ml-servable-core/src/main/java/org/apache/flink/ml/util/ServableReadWriteUtils.java:
##
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more 

[GitHub] [flink] X-czh commented on pull request #21673: [FLINK-30513] Cleanup HA storage path on cluster termination

2023-03-08 Thread via GitHub


X-czh commented on PR #21673:
URL: https://github.com/apache/flink/pull/21673#issuecomment-1461279033

   @reswqa Kindly remind. Could you help merge the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31346) Batch shuffle IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0

2023-03-08 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697402#comment-17697402
 ] 

Weijie Guo edited comment on FLINK-31346 at 3/9/23 4:21 AM:


master(1.18) via 5ad2ae2c24ade2655981f609298978d26329466f.
release-1.17 via 54c67e5e08c11ef9a538abbf14618f9e27be18f7.
release-1.16 via 860ce4f57b2599516cd199a20204c047ca34c1e3.


was (Author: weijie guo):
master(1.18) via 5ad2ae2c24ade2655981f609298978d26329466f.
release-1.17 via 54c67e5e08c11ef9a538abbf14618f9e27be18f7.

> Batch shuffle IO scheduler does not throw TimeoutException if 
> numRequestedBuffers is greater than 0
> ---
>
> Key: FLINK-31346
> URL: https://issues.apache.org/jira/browse/FLINK-31346
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.1
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> We currently rely on throw exception to trigger downstream task failover to 
> avoid read buffer request deadlock. But if {{numRequestedBuffers}} is greater 
> than 0, IO scheduler does not throw {{TimeoutException}}. This will cause a 
> deadlock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31346) Batch shuffle IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0

2023-03-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-31346.
--
Fix Version/s: 1.16.2
   Resolution: Fixed

> Batch shuffle IO scheduler does not throw TimeoutException if 
> numRequestedBuffers is greater than 0
> ---
>
> Key: FLINK-31346
> URL: https://issues.apache.org/jira/browse/FLINK-31346
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.16.1
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.2
>
>
> We currently rely on throw exception to trigger downstream task failover to 
> avoid read buffer request deadlock. But if {{numRequestedBuffers}} is greater 
> than 0, IO scheduler does not throw {{TimeoutException}}. This will cause a 
> deadlock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31346) Batch shuffle IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0

2023-03-08 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-31346:
---
Affects Version/s: 1.17.0

> Batch shuffle IO scheduler does not throw TimeoutException if 
> numRequestedBuffers is greater than 0
> ---
>
> Key: FLINK-31346
> URL: https://issues.apache.org/jira/browse/FLINK-31346
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.2
>
>
> We currently rely on throw exception to trigger downstream task failover to 
> avoid read buffer request deadlock. But if {{numRequestedBuffers}} is greater 
> than 0, IO scheduler does not throw {{TimeoutException}}. This will cause a 
> deadlock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa merged pull request #22123: [BP-1.16][FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0.

2023-03-08 Thread via GitHub


reswqa merged PR #22123:
URL: https://github.com/apache/flink/pull/22123


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31351) HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 times out on CI

2023-03-08 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697859#comment-17697859
 ] 

Shengkai Fang edited comment on FLINK-31351 at 3/9/23 3:16 AM:
---

Merged into release-1.16: 6fd3b9b338433d1e8240a1598bda883ef01cc9c4
Merged into release-1.17: 32b370181853f4129fd237c6a57491863a7e8b8c
Merged into master: 384d6b10a2d69b9384052c3d4c3ad82babd201d1


was (Author: fsk119):
Merged into release-1.17: 32b370181853f4129fd237c6a57491863a7e8b8c
Merged into master: 384d6b10a2d69b9384052c3d4c3ad82babd201d1

> HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 
> times out on CI
> -
>
> Key: FLINK-31351
> URL: https://issues.apache.org/jira/browse/FLINK-31351
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: David Morávek
>Assignee: Shengkai Fang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46872=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24908]
>  
> {code:java}
> Mar 06 18:28:56 "ForkJoinPool-1-worker-25" #27 daemon prio=5 os_prio=0 
> tid=0x7ff4b1832000 nid=0x21b2 waiting on condition [0x7ff3a8c3e000]
> Mar 06 18:28:56java.lang.Thread.State: TIMED_WAITING (sleeping)
> Mar 06 18:28:56   at java.lang.Thread.sleep(Native Method)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.waitUntilJobIsRunning(HiveServer2EndpointITCase.java:1004)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.lambda$testExecuteStatementInSyncModeWithRuntimeException2$37(HiveServer2EndpointITCase.java:711)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase$$Lambda$2018/2127600974.accept(Unknown
>  Source)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.runExecuteStatementInSyncModeWithRuntimeException(HiveServer2EndpointITCase.java:999)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2(HiveServer2EndpointITCase.java:709)
> Mar 06 18:28:56   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 06 18:28:56   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 06 18:28:56   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 06 18:28:56   at java.lang.reflect.Method.invoke(Method.java:498)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31351) HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 times out on CI

2023-03-08 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang closed FLINK-31351.
-
Resolution: Fixed

> HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 
> times out on CI
> -
>
> Key: FLINK-31351
> URL: https://issues.apache.org/jira/browse/FLINK-31351
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: David Morávek
>Assignee: Shengkai Fang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46872=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24908]
>  
> {code:java}
> Mar 06 18:28:56 "ForkJoinPool-1-worker-25" #27 daemon prio=5 os_prio=0 
> tid=0x7ff4b1832000 nid=0x21b2 waiting on condition [0x7ff3a8c3e000]
> Mar 06 18:28:56java.lang.Thread.State: TIMED_WAITING (sleeping)
> Mar 06 18:28:56   at java.lang.Thread.sleep(Native Method)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.waitUntilJobIsRunning(HiveServer2EndpointITCase.java:1004)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.lambda$testExecuteStatementInSyncModeWithRuntimeException2$37(HiveServer2EndpointITCase.java:711)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase$$Lambda$2018/2127600974.accept(Unknown
>  Source)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.runExecuteStatementInSyncModeWithRuntimeException(HiveServer2EndpointITCase.java:999)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2(HiveServer2EndpointITCase.java:709)
> Mar 06 18:28:56   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 06 18:28:56   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 06 18:28:56   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 06 18:28:56   at java.lang.reflect.Method.invoke(Method.java:498)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31351) HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 times out on CI

2023-03-08 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697859#comment-17697859
 ] 

Shengkai Fang edited comment on FLINK-31351 at 3/9/23 3:15 AM:
---

Merged into release-1.17: 32b370181853f4129fd237c6a57491863a7e8b8c
Merged into master: 384d6b10a2d69b9384052c3d4c3ad82babd201d1


was (Author: fsk119):
Merged into release-1.17: 32b370181853f4129fd237c6a57491863a7e8b8c

> HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2 
> times out on CI
> -
>
> Key: FLINK-31351
> URL: https://issues.apache.org/jira/browse/FLINK-31351
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.17.0, 1.16.1, 1.18.0
>Reporter: David Morávek
>Assignee: Shengkai Fang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46872=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=24908]
>  
> {code:java}
> Mar 06 18:28:56 "ForkJoinPool-1-worker-25" #27 daemon prio=5 os_prio=0 
> tid=0x7ff4b1832000 nid=0x21b2 waiting on condition [0x7ff3a8c3e000]
> Mar 06 18:28:56java.lang.Thread.State: TIMED_WAITING (sleeping)
> Mar 06 18:28:56   at java.lang.Thread.sleep(Native Method)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.waitUntilJobIsRunning(HiveServer2EndpointITCase.java:1004)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.lambda$testExecuteStatementInSyncModeWithRuntimeException2$37(HiveServer2EndpointITCase.java:711)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase$$Lambda$2018/2127600974.accept(Unknown
>  Source)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.runExecuteStatementInSyncModeWithRuntimeException(HiveServer2EndpointITCase.java:999)
> Mar 06 18:28:56   at 
> org.apache.flink.table.endpoint.hive.HiveServer2EndpointITCase.testExecuteStatementInSyncModeWithRuntimeException2(HiveServer2EndpointITCase.java:709)
> Mar 06 18:28:56   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 06 18:28:56   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 06 18:28:56   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 06 18:28:56   at java.lang.reflect.Method.invoke(Method.java:498)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fsk119 merged pull request #22133: [FLINK-31351][sql-gateway] Don't stop the stuck thread by force

2023-03-08 Thread via GitHub


fsk119 merged PR #22133:
URL: https://github.com/apache/flink/pull/22133


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 merged pull request #22132: [FLINK-31351][sql-gateway] Don't stop the stuck thread by force

2023-03-08 Thread via GitHub


fsk119 merged PR #22132:
URL: https://github.com/apache/flink/pull/22132


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #220: [FLINK-31325] Improve performance of Swing

2023-03-08 Thread via GitHub


zhipeng93 commented on code in PR #220:
URL: https://github.com/apache/flink-ml/pull/220#discussion_r1130294319


##
flink-ml-lib/src/test/java/org/apache/flink/ml/recommendation/SwingTest.java:
##
@@ -140,6 +141,7 @@ public void testParam() {
 assertEquals(5, swing.getAlpha1());
 assertEquals(1, swing.getAlpha2());
 assertEquals(0.35, swing.getBeta(), 1e-9);
+assertEquals(1, swing.getSeed());

Review Comment:
   Let's also check the default value of `seed`.



##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##
@@ -274,13 +272,29 @@ private static class ComputingSimilarItems extends 
AbstractStreamOperator
 private static final Character commaDelimiter = ',';
 private static final Character semicolonDelimiter = ';';
 
+private final Random random;
+
+private Map userAndPurchasedItems = new HashMap<>();
+private Map> itemAndPurchasers = new HashMap<>();
+
+private ListState> userAndPurchasedItemsState;
+private ListState>> itemAndPurchasersState;
+
 private ComputingSimilarItems(
-int k, int maxUserNumPerItem, int alpha1, int alpha2, double 
beta) {
+int k,
+int maxUserNumPerItem,
+int maxUserBehavior,
+int alpha1,
+int alpha2,
+double beta,
+long seed) {
 this.k = k;
 this.maxUserNumPerItem = maxUserNumPerItem;
+this.maxUserBehavior = maxUserBehavior;
 this.alpha1 = alpha1;
 this.alpha2 = alpha2;
 this.beta = beta;
+this.random = new Random(seed);

Review Comment:
   Is there a test case that uses a non-default seed? I would expect using 
different seeds would lead to different output but did not find it.



##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##
@@ -289,36 +303,40 @@ public void endInput() throws Exception {
 Map userWeights = new 
HashMap<>(userAndPurchasedItems.size());
 userAndPurchasedItems.forEach(
 (k, v) -> {
-int count = v.size();
+int count = v.length;
 userWeights.put(k, calculateWeight(count));
 });
 
 for (long mainItem : itemAndPurchasers.keySet()) {
-List userList =
-sampleUserList(itemAndPurchasers.get(mainItem), 
maxUserNumPerItem);
+List userList = new 
ArrayList<>(itemAndPurchasers.get(mainItem));

Review Comment:
   Is creating a new list (which introduces extra object copy) necessary?



##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##
@@ -289,36 +303,40 @@ public void endInput() throws Exception {
 Map userWeights = new 
HashMap<>(userAndPurchasedItems.size());
 userAndPurchasedItems.forEach(
 (k, v) -> {
-int count = v.size();
+int count = v.length;
 userWeights.put(k, calculateWeight(count));
 });
 
 for (long mainItem : itemAndPurchasers.keySet()) {
-List userList =
-sampleUserList(itemAndPurchasers.get(mainItem), 
maxUserNumPerItem);
+List userList = new 
ArrayList<>(itemAndPurchasers.get(mainItem));
+long[] interaction = new long[maxUserBehavior];
 HashMap id2swing = new HashMap<>();
 
-for (int i = 0; i < userList.size(); i++) {
+for (int i = 1; i < userList.size(); i++) {
 long u = userList.get(i);
+int interactionSize = 0;

Review Comment:
   nit: Variable 'interactionSize' initializer '0' is redundant 



##
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##
@@ -289,36 +303,40 @@ public void endInput() throws Exception {
 Map userWeights = new 
HashMap<>(userAndPurchasedItems.size());
 userAndPurchasedItems.forEach(
 (k, v) -> {
-int count = v.size();
+int count = v.length;
 userWeights.put(k, calculateWeight(count));
 });
 
 for (long mainItem : itemAndPurchasers.keySet()) {
-List userList =
-sampleUserList(itemAndPurchasers.get(mainItem), 
maxUserNumPerItem);
+List userList = new 
ArrayList<>(itemAndPurchasers.get(mainItem));
+long[] interaction = new long[maxUserBehavior];

Review Comment:
   nit: we can put this outside the loop for efficiency.



##
flink-ml-python/pyflink/ml/recommendation/tests/test_swing.py:

[GitHub] [flink] LadyForest commented on a diff in pull request #21452: [FLINK-30282] Fix Logical type ROW lost inner field's nullability after converting to RelDataType

2023-03-08 Thread via GitHub


LadyForest commented on code in PR #21452:
URL: https://github.com/apache/flink/pull/21452#discussion_r1130310752


##
flink-table/flink-sql-client/src/test/resources/sql/table.q:
##
@@ -964,7 +998,7 @@ CREATE TABLE IF NOT EXISTS daily_orders (
  PRIMARY KEY(dt, `user`) NOT ENFORCED
 ) PARTITIONED BY (dt) WITH (
  'connector' = 'filesystem',
- 'path' = '$VAR_BATCH_PATH',
+ 'path' = 
'/var/folders/xd/9dp1y4vd3h56kjkvdk426l50gp/T/junit5781726749537647921/7bf278b4-9932-4e24-af7b-1126df4667405030013249695173150',

Review Comment:
   > this i didn't get... can you please elaborate where does this magic name 
come from?
   
   Good catch! `$VAR_BATCH_PATH` is defined in `CliClientITCase` and serves as 
a placeholder to be replaced at runtime. I overwrote this by mistake and will 
revert this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-03-08 Thread via GitHub


liuyongvs commented on PR #21993:
URL: https://github.com/apache/flink/pull/21993#issuecomment-1461165977

   hi @snuyanzin ,thanks for your review and i learned a lot


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-03-08 Thread via GitHub


liuyongvs commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1130305597


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##
@@ -102,6 +105,16 @@ Stream getTestSetSpecs() {
 "ARRAY_CONTAINS(f4, NULL)",
 true,
 DataTypes.BOOLEAN().nullable())
+.testResult(
+$("f5").arrayContains(lit(null, 
DataTypes.INT())),
+"ARRAY_CONTAINS(f5, CAST(NULL AS INT))",
+false,
+DataTypes.BOOLEAN().notNull())
+.testResult(
+$("f5").arrayContains(lit(4, 
DataTypes.INT().notNull())),
+"ARRAY_CONTAINS(f5, 4)",
+false,
+DataTypes.BOOLEAN().notNull())

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-03-08 Thread via GitHub


liuyongvs commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1130306112


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementArgumentTypeStrategy.java:
##
@@ -42,9 +42,14 @@ public Optional inferArgumentType(
 CallContext callContext, int argumentPos, boolean throwOnFailure) {
 final ArrayType haystackType =
 (ArrayType) 
callContext.getArgumentDataTypes().get(0).getLogicalType();
-final LogicalType haystackElementType = haystackType.getElementType();
 final LogicalType needleType =
 
callContext.getArgumentDataTypes().get(argumentPos).getLogicalType();
+LogicalType haystackElementType = haystackType.getElementType();
+
+if (!haystackElementType.isNullable() && needleType.isNullable()) {
+haystackElementType = haystackType.getElementType().copy(true);

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ruanhang1993 closed pull request #17815: [FLINK-22702][tests] Add test data supplier which provide null timestamp field to kafka connector tests

2023-03-08 Thread via GitHub


ruanhang1993 closed pull request #17815: [FLINK-22702][tests] Add test data 
supplier which provide null timestamp field to kafka connector tests
URL: https://github.com/apache/flink/pull/17815


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ruanhang1993 closed pull request #18497: [FLINK-25290][tests] add table tests for connector testframe

2023-03-08 Thread via GitHub


ruanhang1993 closed pull request #18497: [FLINK-25290][tests] add table tests 
for connector testframe
URL: https://github.com/apache/flink/pull/18497


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] zhangjun0x01 commented on pull request #584: [FLINK-31338] support infer parallelism for flink table store

2023-03-08 Thread via GitHub


zhangjun0x01 commented on PR #584:
URL: 
https://github.com/apache/flink-table-store/pull/584#issuecomment-1461145922

   > @zhangjun0x01 Can we consider to use bucket number as streaming default 
parallelism? And use parallelism inference only for batch source.
   
   I updated it, and disabled the `scan.infer-parallelism` in default . it is 
fine, It may be really caused by too much parallelism


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] zhangjun0x01 commented on pull request #584: [FLINK-31338] support infer parallelism for flink table store

2023-03-08 Thread via GitHub


zhangjun0x01 commented on PR #584:
URL: 
https://github.com/apache/flink-table-store/pull/584#issuecomment-1461142959

   
   
   
   > The same problem of E2E test I have met before. I've pushed some commits 
to try to solve the problem. You can rebase master.
   yeah,I resubmitted
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31374) ProxyStreamPartitioner should implement ConfigurableStreamPartitioner

2023-03-08 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31374:
-

 Summary: ProxyStreamPartitioner should implement 
ConfigurableStreamPartitioner
 Key: FLINK-31374
 URL: https://issues.apache.org/jira/browse/FLINK-31374
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang


In flink-ml-iterations module, we use ProxyStreamPartitioner to wrap 
StreamPartitioner to deal with records in iterations.

 

However, it did not implement ConfigurableStreamPartitioner interface. Thus 
that maxParallelism information is lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31373) PerRoundWrapperOperator should carry epoch information in watermark

2023-03-08 Thread Zhipeng Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31373:
--
Description: 
Currently we use PerRoundWrapperOperator to wrap the normal flink operators 
such that they can be used in iterations.

We already contained the epoch information in each record so that we know which 
iteration each record belongs to.

However, there is no epoch information when the stream element is a watermark. 
This works in most cases, but fail to address the following use case:
 - In DataStreamUtils#withBroadcast, we will cache the elements (including 
watermarks) from non-broadcast inputs until the broadcast variables are ready. 
When the broadcast variables are ready, once we receive a stream element we 
will process the cached elements first. If the received element is a watermark, 
the current implementation of iteration module fails (ProxyOutput#collect 
throws NPE) since there is no epoch  information.

  was:
Currently we use `PerRoundWrapperOperator` to wrap the normal flink operators 
such that they can be used in iterations.


We already contained the epoch information in each record so that we know which 
iteration each record belongs to.

However, there is no epoch information when the stream element is a watermark. 
This works in most cases, but fail to address the following issue:

- In DataStreamUtils#withBroadcast, we will cache the elements (including 
watermarks) from non-broadcast inputs until the broadcast variables are ready. 
When the broadcast variables are ready, once we receive a stream element we 
will process the cached elements first. If the received element is a watermark, 
the current implementation of iteration module fails (`ProxyOutput#collect` 
throws NPE) since there is no epoch  information.


> PerRoundWrapperOperator should carry epoch information in watermark
> ---
>
> Key: FLINK-31373
> URL: https://issues.apache.org/jira/browse/FLINK-31373
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Affects Versions: ml-2.2.0
>Reporter: Zhipeng Zhang
>Priority: Major
>
> Currently we use PerRoundWrapperOperator to wrap the normal flink operators 
> such that they can be used in iterations.
> We already contained the epoch information in each record so that we know 
> which iteration each record belongs to.
> However, there is no epoch information when the stream element is a 
> watermark. This works in most cases, but fail to address the following use 
> case:
>  - In DataStreamUtils#withBroadcast, we will cache the elements (including 
> watermarks) from non-broadcast inputs until the broadcast variables are 
> ready. When the broadcast variables are ready, once we receive a stream 
> element we will process the cached elements first. If the received element is 
> a watermark, the current implementation of iteration module fails 
> (ProxyOutput#collect throws NPE) since there is no epoch  information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31373) PerRoundWrapperOperator should carry epoch information in watermark

2023-03-08 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-31373:
-

 Summary: PerRoundWrapperOperator should carry epoch information in 
watermark
 Key: FLINK-31373
 URL: https://issues.apache.org/jira/browse/FLINK-31373
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.2.0
Reporter: Zhipeng Zhang


Currently we use `PerRoundWrapperOperator` to wrap the normal flink operators 
such that they can be used in iterations.


We already contained the epoch information in each record so that we know which 
iteration each record belongs to.

However, there is no epoch information when the stream element is a watermark. 
This works in most cases, but fail to address the following issue:

- In DataStreamUtils#withBroadcast, we will cache the elements (including 
watermarks) from non-broadcast inputs until the broadcast variables are ready. 
When the broadcast variables are ready, once we receive a stream element we 
will process the cached elements first. If the received element is a watermark, 
the current implementation of iteration module fails (`ProxyOutput#collect` 
throws NPE) since there is no epoch  information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-03-08 Thread via GitHub


snuyanzin commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1130032947


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##
@@ -102,6 +105,16 @@ Stream getTestSetSpecs() {
 "ARRAY_CONTAINS(f4, NULL)",
 true,
 DataTypes.BOOLEAN().nullable())
+.testResult(
+$("f5").arrayContains(lit(null, 
DataTypes.INT())),
+"ARRAY_CONTAINS(f5, CAST(NULL AS INT))",
+false,
+DataTypes.BOOLEAN().notNull())
+.testResult(
+$("f5").arrayContains(lit(4, 
DataTypes.INT().notNull())),
+"ARRAY_CONTAINS(f5, 4)",
+false,
+DataTypes.BOOLEAN().notNull())

Review Comment:
   would be nice to have a test checking for containing `null` with `true` as 
expected result



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-03-08 Thread via GitHub


snuyanzin commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1130031361


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementArgumentTypeStrategy.java:
##
@@ -42,9 +42,14 @@ public Optional inferArgumentType(
 CallContext callContext, int argumentPos, boolean throwOnFailure) {
 final ArrayType haystackType =
 (ArrayType) 
callContext.getArgumentDataTypes().get(0).getLogicalType();
-final LogicalType haystackElementType = haystackType.getElementType();
 final LogicalType needleType =
 
callContext.getArgumentDataTypes().get(argumentPos).getLogicalType();
+LogicalType haystackElementType = haystackType.getElementType();
+
+if (!haystackElementType.isNullable() && needleType.isNullable()) {
+haystackElementType = haystackType.getElementType().copy(true);

Review Comment:
   ```suggestion
   haystackElementType = haystackElementType.copy(true);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22135: [FLINK-31369][sql-gateway] Harden modifiers in tests for sql-gateway module

2023-03-08 Thread via GitHub


flinkbot commented on PR #22135:
URL: https://github.com/apache/flink/pull/22135#issuecomment-1460876245

   
   ## CI report:
   
   * 7c1d6079a4ba2323b410cf4209695b468ec735fd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31369) Harden modifiers for sql-gateway module

2023-03-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31369:
---
Labels: pull-request-available  (was: )

> Harden modifiers for sql-gateway module
> ---
>
> Key: FLINK-31369
> URL: https://issues.apache.org/jira/browse/FLINK-31369
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway, Tests
>Reporter: Sergey Nuyanzin
>Priority: Minor
>  Labels: pull-request-available
>
> This is a follow up jira issue for 
> https://github.com/apache/flink/pull/22127#discussion_r1129192778



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin opened a new pull request, #22135: [FLINK-31369][sql-gateway] Harden modifiers in tests for sql-gateway module

2023-03-08 Thread via GitHub


snuyanzin opened a new pull request, #22135:
URL: https://github.com/apache/flink/pull/22135

   ## What is the purpose of the change
   
   The PR hardens modifiers for junit5 tests
   
   as a follow up for 
https://github.com/apache/flink/pull/22127#discussion_r1129192778
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: ( no)
 - The runtime per-record code paths (performance sensitive):  (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
 - The S3 file system connector: ( no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable )
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31133) PartiallyFinishedSourcesITCase hangs if a checkpoint fails

2023-03-08 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-31133:
--
Fix Version/s: 1.15.4
   (was: 1.15.5)

> PartiallyFinishedSourcesITCase hangs if a checkpoint fails
> --
>
> Key: FLINK-31133
> URL: https://issues.apache.org/jira/browse/FLINK-31133
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.3, 1.16.1, 1.18.0, 1.17.1
>Reporter: Matthias Pohl
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.15.4, 1.16.2, 1.18.0, 1.17.1
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46299=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b
> This build ran into a timeout. Based on the stacktraces reported, it was 
> either caused by 
> [SnapshotMigrationTestBase.restoreAndExecute|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46299=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=13475]:
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7f23d800b800 nid=0x60cdd waiting on 
> condition [0x7f23e1c0d000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.restoreAndExecute(SnapshotMigrationTestBase.java:382)
>   at 
> org.apache.flink.test.migration.TypeSerializerSnapshotMigrationITCase.testSnapshot(TypeSerializerSnapshotMigrationITCase.java:172)
>   at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)
> [...]
> {code}
> or 
> [PartiallyFinishedSourcesITCase.test|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46299=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=10401]:
> {code}
> 2023-02-20T07:13:05.6084711Z "main" #1 prio=5 os_prio=0 
> tid=0x7fd35c00b800 nid=0x8c8a waiting on condition [0x7fd363d0f000]
> 2023-02-20T07:13:05.6085149Zjava.lang.Thread.State: TIMED_WAITING 
> (sleeping)
> 2023-02-20T07:13:05.6085487Z  at java.lang.Thread.sleep(Native Method)
> 2023-02-20T07:13:05.6085925Z  at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145)
> 2023-02-20T07:13:05.6086512Z  at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:138)
> 2023-02-20T07:13:05.6087103Z  at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitForSubtasksToFinish(CommonTestUtils.java:291)
> 2023-02-20T07:13:05.6087730Z  at 
> org.apache.flink.runtime.operators.lifecycle.TestJobExecutor.waitForSubtasksToFinish(TestJobExecutor.java:226)
> 2023-02-20T07:13:05.6088410Z  at 
> org.apache.flink.runtime.operators.lifecycle.PartiallyFinishedSourcesITCase.test(PartiallyFinishedSourcesITCase.java:138)
> 2023-02-20T07:13:05.6088957Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}
> Still, it sounds odd: Based on a code analysis it's quite unlikely that those 
> two caused the issue. The former one has a 5 min timeout (see related code in 
> [SnapshotMigrationTestBase:382|https://github.com/apache/flink/blob/release-1.15/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java#L382]).
>  For the other one, we found it being not responsible in the past when some 
> other concurrent test caused the issue (see FLINK-30261).
> An investigation on where we lose the time for the timeout revealed that 
> {{AdaptiveSchedulerITCase}} took 2980s to finish (see [build 
> logs|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46299=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=5265]).
> {code}
> 2023-02-20T03:43:55.4546050Z Feb 20 03:43:55 [ERROR] Picked up 
> JAVA_TOOL_OPTIONS: -XX:+HeapDumpOnOutOfMemoryError
> 2023-02-20T03:43:58.0448506Z Feb 20 03:43:58 [INFO] Running 
> org.apache.flink.test.scheduling.AdaptiveSchedulerITCase
> 2023-02-20T04:33:38.6824634Z Feb 20 04:33:38 [INFO] Tests run: 6, Failures: 
> 0, Errors: 0, Skipped: 0, Time elapsed: 2,980.445 s - in 
> org.apache.flink.test.scheduling.AdaptiveSchedulerITCase
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27169) PartiallyFinishedSourcesITCase.test hangs on azure

2023-03-08 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-27169:
--
Fix Version/s: 1.15.4
   (was: 1.15.5)

> PartiallyFinishedSourcesITCase.test hangs on azure
> --
>
> Key: FLINK-27169
> URL: https://issues.apache.org/jira/browse/FLINK-27169
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0, 1.15.3
>Reporter: Yun Gao
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.16.0, 1.15.4
>
>
> {code:java}
> Apr 10 08:32:18 "main" #1 prio=5 os_prio=0 tid=0x7f553400b800 nid=0x8345 
> waiting on condition [0x7f553be6]
> Apr 10 08:32:18java.lang.Thread.State: TIMED_WAITING (sleeping)
> Apr 10 08:32:18   at java.lang.Thread.sleep(Native Method)
> Apr 10 08:32:18   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145)
> Apr 10 08:32:18   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:138)
> Apr 10 08:32:18   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitForSubtasksToFinish(CommonTestUtils.java:291)
> Apr 10 08:32:18   at 
> org.apache.flink.runtime.operators.lifecycle.TestJobExecutor.waitForSubtasksToFinish(TestJobExecutor.java:226)
> Apr 10 08:32:18   at 
> org.apache.flink.runtime.operators.lifecycle.PartiallyFinishedSourcesITCase.test(PartiallyFinishedSourcesITCase.java:138)
> Apr 10 08:32:18   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Apr 10 08:32:18   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Apr 10 08:32:18   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Apr 10 08:32:18   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 10 08:32:18   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Apr 10 08:32:18   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Apr 10 08:32:18   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Apr 10 08:32:18   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Apr 10 08:32:18   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Apr 10 08:32:18   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> Apr 10 08:32:18   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> Apr 10 08:32:18   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Apr 10 08:32:18   at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> Apr 10 08:32:18   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Apr 10 08:32:18   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Apr 10 08:32:18   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Apr 10 08:32:18   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Apr 10 08:32:18   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Apr 10 08:32:18   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Apr 10 08:32:18   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Apr 10 08:32:18   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Apr 10 08:32:18   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Apr 10 08:32:18   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Apr 10 08:32:18   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Apr 10 08:32:18   at org.junit.runners.Suite.runChild(Suite.java:128)
> Apr 10 08:32:18   at org.junit.runners.Suite.runChild(Suite.java:27)
> Apr 10 08:32:18   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34484=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=6757



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29729) Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader

2023-03-08 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-29729:
--
Fix Version/s: 1.18.0

> Fix credential info configured in flink-conf.yaml is lost during creating 
> ParquetReader
> ---
>
> Key: FLINK-29729
> URL: https://issues.apache.org/jira/browse/FLINK-29729
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Rascal Wu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2, 1.18.0
>
> Attachments: image-2022-10-22-17-41-38-084.png
>
>
> Hi, I'm thinking if we can include the configured properties from 
> flink-conf.yaml during create ParquetReader in `ParquetVectorizedInputformat` 
> besides hadoop configuration.
>  
> I meet a use case that I want to query a table from S3 bucket with parquet 
> format via filesystem connector, and I configured the AWS credential info in 
> the `flink-conf.yaml`, e.g. fs.s3a.access.key, fs.s3a.secret.key, etc. 
>  
> The JobManager(SourceCoordinator) works well about "getFileStatus" of S3 
> objects and generate splits, but TaskManager(SourceOperator -> 
> ParquetVectorizedInputFormat -> ParquetReader) doesn't work since missing AWS 
> credential info.
>  
> After taking a deep analysis at the source code about creating ParquetReader 
> to reader footer, I found that the AWS credential info is not passed during 
> create & initialize S3AFileSystem, the detail info as showing in the bellow 
> snapshot.  !image-2022-10-22-17-41-38-084.png!
>  
> The `hadoopConfig` only contains the properties from table format options and 
> default hadoop properties from core-site.xml, hdfs-site.xml and etc. Because 
> the `hadoopConfig` is injected by 
> `ParquetFileFormatFactory#createRuntimeDecoder` -> 
> `ParquetColumnarRowInputFormat.createPartitionedFormat` -> 
> `ParquetFileFormatFactory.generateParquetConfiguration`
>  
> {code:java}
> @Override
> public BulkFormat createRuntimeDecoder(
> DynamicTableSource.Context sourceContext,
> DataType producedDataType,
> int[][] projections) {
> return ParquetColumnarRowInputFormat.createPartitionedFormat(
> getParquetConfiguration(formatOptions),
> (RowType) 
> Projection.of(projections).project(producedDataType).getLogicalType(),
> sourceContext.createTypeInformation(producedDataType),
> Collections.emptyList(),
> null,
> VectorizedColumnBatch.DEFAULT_SIZE,
> formatOptions.get(UTC_TIMEZONE),
> true);
> }
>  
> private static Configuration getParquetConfiguration(ReadableConfig options) {
> Configuration conf = new Configuration();
> Properties properties = new Properties();
> ((org.apache.flink.configuration.Configuration) 
> options).addAllToProperties(properties);
> properties.forEach((k, v) -> conf.set(IDENTIFIER + "." + k, v.toString()));
> return conf;
> }
> {code}
>  
> I know that I can add the AWS credential info into core-site.xml or 
> hdfs-site.xml, so that the `ParquetReader` can get the credential, but I 
> think it might not a good practice, especially different flink jobs will use 
> different AWS credential, so I'm thinking if we can combine the default 
> hadoop configuration(static) and the properties from 
> `flink-conf.yaml`(dynamic) during create `ParquetReader`. 
> For example, just like how this PR doing? 
> https://github.com/apache/flink/pull/21130
>  
> BTW,  I'm using Flink 1.15.1 in a standalone cluster to validate the whole 
> process, but I think not only 1.15.1 version meet this problem, and not only 
> access the objects/files from AWS S3 bucket, any other cloud object storage 
> might also meet this problem.
>  
> Besides change the code, is there any other solution can help me to handle 
> this problem? thanks. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31272) Duplicate operators appear in the StreamGraph for Python DataStream API jobs

2023-03-08 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-31272:
--
Fix Version/s: 1.15.4
   (was: 1.15.5)

> Duplicate operators appear in the StreamGraph for Python DataStream API jobs
> 
>
> Key: FLINK-31272
> URL: https://issues.apache.org/jira/browse/FLINK-31272
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.0
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
>
> For the following job:
> {code}
> import argparse
> import json
> import sys
> import time
> from typing import Iterable, cast
> from pyflink.common import Types, Time, Encoder
> from pyflink.datastream import StreamExecutionEnvironment, 
> ProcessWindowFunction, EmbeddedRocksDBStateBackend, \
> PredefinedOptions, FileSystemCheckpointStorage, CheckpointingMode, 
> ExternalizedCheckpointCleanup
> from pyflink.datastream.connectors.file_system import FileSink, 
> RollingPolicy, OutputFileConfig
> from pyflink.datastream.state import ReducingState, ReducingStateDescriptor
> from pyflink.datastream.window import TimeWindow, Trigger, TriggerResult, T, 
> TumblingProcessingTimeWindows, \
> ProcessingTimeTrigger
> class CountWithProcessTimeoutTrigger(ProcessingTimeTrigger):
> def __init__(self, window_size: int):
> self._window_size = window_size
> self._count_state_descriptor = ReducingStateDescriptor(
> "count", lambda a, b: a + b, Types.LONG())
> @staticmethod
> def of(window_size: int) -> 'CountWithProcessTimeoutTrigger':
> return CountWithProcessTimeoutTrigger(window_size)
> def on_element(self,
>element: T,
>timestamp: int,
>window: TimeWindow,
>ctx: 'Trigger.TriggerContext') -> TriggerResult:
> count_state = cast(ReducingState, 
> ctx.get_partitioned_state(self._count_state_descriptor))
> count_state.add(1)
> # print("element arrive:", element, "count_state:", 
> count_state.get(), window.max_timestamp(),
> #   ctx.get_current_watermark())
> if count_state.get() >= self._window_size:  # 必须fire
> print("fire element count", element, count_state.get(), 
> window.max_timestamp(),
>   ctx.get_current_watermark())
> count_state.clear()
> return TriggerResult.FIRE_AND_PURGE
> if timestamp >= window.end:
> count_state.clear()
> return TriggerResult.FIRE_AND_PURGE
> else:
> return TriggerResult.CONTINUE
> def on_processing_time(self,
>timestamp: int,
>window: TimeWindow,
>ctx: Trigger.TriggerContext) -> TriggerResult:
> if timestamp >= window.end:
> return TriggerResult.CONTINUE
> else:
> print("fire with process_time:", timestamp)
> count_state = cast(ReducingState, 
> ctx.get_partitioned_state(self._count_state_descriptor))
> count_state.clear()
> return TriggerResult.FIRE_AND_PURGE
> def on_event_time(self,
>   timestamp: int,
>   window: TimeWindow,
>   ctx: 'Trigger.TriggerContext') -> TriggerResult:
> return TriggerResult.CONTINUE
> def clear(self,
>   window: TimeWindow,
>   ctx: 'Trigger.TriggerContext') -> None:
> count_state = ctx.get_partitioned_state(self._count_state_descriptor)
> count_state.clear()
> def to_dict_map(v):
> time.sleep(1)
> dict_value = json.loads(v)
> return dict_value
> def get_group_key(value, keys):
> group_key_values = []
> for key in keys:
> one_key_value = 'null'
> if key in value:
> list_value = value[key]
> if list_value:
> one_key_value = str(list_value[0])
> group_key_values.append(one_key_value)
> group_key = '_'.join(group_key_values)
> # print("group_key=", group_key)
> return group_key
> class CountWindowProcessFunction(ProcessWindowFunction[dict, dict, str, 
> TimeWindow]):
> def __init__(self, uf):
> self._user_function = uf
> def process(self,
> key: str,
> context: ProcessWindowFunction.Context[TimeWindow],
> elements: Iterable[dict]) -> Iterable[dict]:
> result_list = 
> self._user_function.process_after_group_by_function(elements)
> return result_list
> if __name__ == '__main__':
> parser = 

[jira] [Updated] (FLINK-29729) Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader

2023-03-08 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-29729:
--
Fix Version/s: 1.15.4
   (was: 1.18.0)
   (was: 1.15.5)

> Fix credential info configured in flink-conf.yaml is lost during creating 
> ParquetReader
> ---
>
> Key: FLINK-29729
> URL: https://issues.apache.org/jira/browse/FLINK-29729
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Rascal Wu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
> Attachments: image-2022-10-22-17-41-38-084.png
>
>
> Hi, I'm thinking if we can include the configured properties from 
> flink-conf.yaml during create ParquetReader in `ParquetVectorizedInputformat` 
> besides hadoop configuration.
>  
> I meet a use case that I want to query a table from S3 bucket with parquet 
> format via filesystem connector, and I configured the AWS credential info in 
> the `flink-conf.yaml`, e.g. fs.s3a.access.key, fs.s3a.secret.key, etc. 
>  
> The JobManager(SourceCoordinator) works well about "getFileStatus" of S3 
> objects and generate splits, but TaskManager(SourceOperator -> 
> ParquetVectorizedInputFormat -> ParquetReader) doesn't work since missing AWS 
> credential info.
>  
> After taking a deep analysis at the source code about creating ParquetReader 
> to reader footer, I found that the AWS credential info is not passed during 
> create & initialize S3AFileSystem, the detail info as showing in the bellow 
> snapshot.  !image-2022-10-22-17-41-38-084.png!
>  
> The `hadoopConfig` only contains the properties from table format options and 
> default hadoop properties from core-site.xml, hdfs-site.xml and etc. Because 
> the `hadoopConfig` is injected by 
> `ParquetFileFormatFactory#createRuntimeDecoder` -> 
> `ParquetColumnarRowInputFormat.createPartitionedFormat` -> 
> `ParquetFileFormatFactory.generateParquetConfiguration`
>  
> {code:java}
> @Override
> public BulkFormat createRuntimeDecoder(
> DynamicTableSource.Context sourceContext,
> DataType producedDataType,
> int[][] projections) {
> return ParquetColumnarRowInputFormat.createPartitionedFormat(
> getParquetConfiguration(formatOptions),
> (RowType) 
> Projection.of(projections).project(producedDataType).getLogicalType(),
> sourceContext.createTypeInformation(producedDataType),
> Collections.emptyList(),
> null,
> VectorizedColumnBatch.DEFAULT_SIZE,
> formatOptions.get(UTC_TIMEZONE),
> true);
> }
>  
> private static Configuration getParquetConfiguration(ReadableConfig options) {
> Configuration conf = new Configuration();
> Properties properties = new Properties();
> ((org.apache.flink.configuration.Configuration) 
> options).addAllToProperties(properties);
> properties.forEach((k, v) -> conf.set(IDENTIFIER + "." + k, v.toString()));
> return conf;
> }
> {code}
>  
> I know that I can add the AWS credential info into core-site.xml or 
> hdfs-site.xml, so that the `ParquetReader` can get the credential, but I 
> think it might not a good practice, especially different flink jobs will use 
> different AWS credential, so I'm thinking if we can combine the default 
> hadoop configuration(static) and the properties from 
> `flink-conf.yaml`(dynamic) during create `ParquetReader`. 
> For example, just like how this PR doing? 
> https://github.com/apache/flink/pull/21130
>  
> BTW,  I'm using Flink 1.15.1 in a standalone cluster to validate the whole 
> process, but I think not only 1.15.1 version meet this problem, and not only 
> access the objects/files from AWS S3 bucket, any other cloud object storage 
> might also meet this problem.
>  
> Besides change the code, is there any other solution can help me to handle 
> this problem? thanks. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31286) Python processes are still alive when shutting down a session cluster directly without stopping the jobs

2023-03-08 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-31286:
--
Fix Version/s: 1.15.4
   (was: 1.15.5)

> Python processes are still alive when shutting down a session cluster 
> directly without stopping the jobs
> 
>
> Key: FLINK-31286
> URL: https://issues.apache.org/jira/browse/FLINK-31286
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
> Attachments: image-2023-03-02-10-55-34-863.png
>
>
> Reproduce steps:
> 1) start a standalone cluster: ./bin/start_cluster.sh
> 2) submit a PyFlink job which contains Python UDFs
> 3) stop the cluster: ./bin/stop_cluster.sh
> 4) Check if Python process still exists: ps aux | grep -i beam_boot
> !image-2023-03-02-10-55-34-863.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31372) Memory Leak in prometheus HTTPMetricHandler when reporting fails

2023-03-08 Thread Krzysztof Dziolak (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krzysztof Dziolak updated FLINK-31372:
--
Summary: Memory Leak in prometheus HTTPMetricHandler when reporting fails  
(was: Memory Leak in HTTPMetricHandler when reporting fails)

> Memory Leak in prometheus HTTPMetricHandler when reporting fails
> 
>
> Key: FLINK-31372
> URL: https://issues.apache.org/jira/browse/FLINK-31372
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, kafka
>Affects Versions: 1.16.1, 1.15.4, 1.17.1
>Reporter: Krzysztof Dziolak
>Priority: Minor
>
> We've identified a memory leak, that occurs when any of the metric reporters 
> fail with an exception. In such cases HTTPExchanges are not  getting closed 
> properly in io.prometheus.client.exporter.HTTPServer.HTTPMetricHandler
> In our case the failure was triggered by usage of incompatible Kafka Client 
> failing metric collection with:
> {{Exception in thread "prometheus-http-1-72873" java.lang.NoSuchMethodError: 
> 'double org.apache.kafka.common.Metric.value()'}}
> Should Prometheus Reporter handle metric collection defensively (by 
> suppressing exceptions) to guarantee metric delivery and avoid similar memory 
> leaks?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31372) Memory Leak in HTTPMetricHandler when reporting fails

2023-03-08 Thread Krzysztof Dziolak (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krzysztof Dziolak updated FLINK-31372:
--
Description: 
We've identified a memory leak, that occurs when any of the metric reporters 
fail with an exception. In such cases HTTPExchanges are not  getting closed 
properly in io.prometheus.client.exporter.HTTPServer.HTTPMetricHandler

In our case the failure was triggered by usage of incompatible Kafka Client 
failing metric collection with:

{{Exception in thread "prometheus-http-1-72873" java.lang.NoSuchMethodError: 
'double org.apache.kafka.common.Metric.value()'}}

Should Prometheus Reporter handle metric collection defensively (by suppressing 
exceptions) to guarantee metric delivery and avoid similar memory leaks?

  was:
Basically I'm running flink at the 1.15.1 version with docker  and often the 
application start to slow down because of OOM errors. 
It was observed that the memory continued to increase, and the number of 
threads continued to increase through the mertics data collected by Prometheus。
I tried to remove the sink kafka code and it looks normal,so I change the flink 
to 1.14.5 and it works fine.
Is this a bug?


> Memory Leak in HTTPMetricHandler when reporting fails
> -
>
> Key: FLINK-31372
> URL: https://issues.apache.org/jira/browse/FLINK-31372
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, kafka
>Affects Versions: 1.16.1, 1.15.4, 1.17.1
>Reporter: Krzysztof Dziolak
>Priority: Minor
>
> We've identified a memory leak, that occurs when any of the metric reporters 
> fail with an exception. In such cases HTTPExchanges are not  getting closed 
> properly in io.prometheus.client.exporter.HTTPServer.HTTPMetricHandler
> In our case the failure was triggered by usage of incompatible Kafka Client 
> failing metric collection with:
> {{Exception in thread "prometheus-http-1-72873" java.lang.NoSuchMethodError: 
> 'double org.apache.kafka.common.Metric.value()'}}
> Should Prometheus Reporter handle metric collection defensively (by 
> suppressing exceptions) to guarantee metric delivery and avoid similar memory 
> leaks?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31372) Memory Leak in HTTPMetricHandler when reporting fails

2023-03-08 Thread Krzysztof Dziolak (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krzysztof Dziolak updated FLINK-31372:
--
Affects Version/s: 1.16.1
   1.15.4
   1.17.1
   (was: 1.15.0)
   (was: 1.15.1)

> Memory Leak in HTTPMetricHandler when reporting fails
> -
>
> Key: FLINK-31372
> URL: https://issues.apache.org/jira/browse/FLINK-31372
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, kafka
>Affects Versions: 1.16.1, 1.15.4, 1.17.1
>Reporter: Krzysztof Dziolak
>Priority: Minor
>
> Basically I'm running flink at the 1.15.1 version with docker  and often the 
> application start to slow down because of OOM errors. 
> It was observed that the memory continued to increase, and the number of 
> threads continued to increase through the mertics data collected by 
> Prometheus。
> I tried to remove the sink kafka code and it looks normal,so I change the 
> flink to 1.14.5 and it works fine.
> Is this a bug?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31372) Memory Leak in HTTPMetricHandler when reporting fails

2023-03-08 Thread Krzysztof Dziolak (Jira)
Krzysztof Dziolak created FLINK-31372:
-

 Summary: Memory Leak in HTTPMetricHandler when reporting fails
 Key: FLINK-31372
 URL: https://issues.apache.org/jira/browse/FLINK-31372
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, kafka
Affects Versions: 1.15.0, 1.15.1
Reporter: Krzysztof Dziolak


Basically I'm running flink at the 1.15.1 version with docker  and often the 
application start to slow down because of OOM errors. 
It was observed that the memory continued to increase, and the number of 
threads continued to increase through the mertics data collected by Prometheus。
I tried to remove the sink kafka code and it looks normal,so I change the flink 
to 1.14.5 and it works fine.
Is this a bug?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31372) Memory Leak in HTTPMetricHandler when reporting fails

2023-03-08 Thread Krzysztof Dziolak (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krzysztof Dziolak updated FLINK-31372:
--
Priority: Minor  (was: Major)

> Memory Leak in HTTPMetricHandler when reporting fails
> -
>
> Key: FLINK-31372
> URL: https://issues.apache.org/jira/browse/FLINK-31372
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, kafka
>Affects Versions: 1.15.0, 1.15.1
>Reporter: Krzysztof Dziolak
>Priority: Minor
>
> Basically I'm running flink at the 1.15.1 version with docker  and often the 
> application start to slow down because of OOM errors. 
> It was observed that the memory continued to increase, and the number of 
> threads continued to increase through the mertics data collected by 
> Prometheus。
> I tried to remove the sink kafka code and it looks normal,so I change the 
> flink to 1.14.5 and it works fine.
> Is this a bug?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31371) Stream failure if the topic doesn't exist

2023-03-08 Thread Enzo Dechaene (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Enzo Dechaene updated FLINK-31371:
--
Description: 
*Describe the bug*
With a Pulsar 2.8.4 server, a Flink stream containing Pulsar sources or sinks 
will fail at startup if the topic doesn't exist.

 

*To Reproduce*
Create a stream with :
 * Flink 1.15.2
 * Pulsar 2.8.4
 * with a Pulsar source or sink linked to a non existant topic
 * Start the stream

 

*Expected behavior*
If the topic doesn't exist, it should be created at the first connection of the 
source or sink without error.

 

*Additional context*
In the TopicListSubscriber class of the connector, the method 
getSubscribedTopicPartitions() try to get the metadata of a topic by doing that 
:
 
{code:java}
TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, topic);{code}
 

If the topic doesn't exist, I get a NullPointerException on the metadata

We created a previous 
[ticket|https://github.com/streamnative/pulsar-flink/issues/366] on the Pulsar 
connector and it was fixed

  was:
*Describe the bug*
With a Pulsar 2.8.4 server, a Flink stream containing Pulsar sources or sinks 
will fail at startup if the topic doesn't exist.

 

*To Reproduce*
Create a stream with :
 * Flink 1.15.2
 * Pulsar 2.8.2
 * with a Pulsar source or sink linked to a non existant topic
 * Start the stream

 

*Expected behavior*
If the topic doesn't exist, it should be created at the first connection of the 
source or sink without error.

 

*Additional context*
In the TopicListSubscriber class of the connector, the method 
getSubscribedTopicPartitions() try to get the metadata of a topic by doing that 
:
 
{code:java}
TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, topic);{code}
 

If the topic doesn't exist, I get a NullPointerException on the metadata

We created a previous 
[ticket|https://github.com/streamnative/pulsar-flink/issues/366] on the Pulsar 
connector and it was fixed


> Stream failure if the topic doesn't exist
> -
>
> Key: FLINK-31371
> URL: https://issues.apache.org/jira/browse/FLINK-31371
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.3
>Reporter: Enzo Dechaene
>Priority: Blocker
>
> *Describe the bug*
> With a Pulsar 2.8.4 server, a Flink stream containing Pulsar sources or sinks 
> will fail at startup if the topic doesn't exist.
>  
> *To Reproduce*
> Create a stream with :
>  * Flink 1.15.2
>  * Pulsar 2.8.4
>  * with a Pulsar source or sink linked to a non existant topic
>  * Start the stream
>  
> *Expected behavior*
> If the topic doesn't exist, it should be created at the first connection of 
> the source or sink without error.
>  
> *Additional context*
> In the TopicListSubscriber class of the connector, the method 
> getSubscribedTopicPartitions() try to get the metadata of a topic by doing 
> that :
>  
> {code:java}
> TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, topic);{code}
>  
> If the topic doesn't exist, I get a NullPointerException on the metadata
> We created a previous 
> [ticket|https://github.com/streamnative/pulsar-flink/issues/366] on the 
> Pulsar connector and it was fixed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31371) Stream failure if the topic doesn't exist

2023-03-08 Thread Enzo Dechaene (Jira)
Enzo Dechaene created FLINK-31371:
-

 Summary: Stream failure if the topic doesn't exist
 Key: FLINK-31371
 URL: https://issues.apache.org/jira/browse/FLINK-31371
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.15.3
Reporter: Enzo Dechaene


*Describe the bug*
With a Pulsar 2.8.4 server, a Flink stream containing Pulsar sources or sinks 
will fail at startup if the topic doesn't exist.

 

*To Reproduce*
Create a stream with :
 * Flink 1.15.2
 * Pulsar 2.8.2
 * with a Pulsar source or sink linked to a non existant topic
 * Start the stream

 

*Expected behavior*
If the topic doesn't exist, it should be created at the first connection of the 
source or sink without error.

 

*Additional context*
In the TopicListSubscriber class of the connector, the method 
getSubscribedTopicPartitions() try to get the metadata of a topic by doing that 
:
 
{code:java}
TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, topic);{code}
 

If the topic doesn't exist, I get a NullPointerException on the metadata

We created a previous 
[ticket|https://github.com/streamnative/pulsar-flink/issues/366] on the Pulsar 
connector and it was fixed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31345) Trim autoscaler configMap to not exceed 1mb size limit

2023-03-08 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-31345.
--
Resolution: Fixed

Merged to main:
f88cbf3fd1b99a574a1ed8b8a2869b96d932e521
70bf6a9d920e9affadb253e7760db12d4e0dd554

> Trim autoscaler configMap to not exceed 1mb size limit
> --
>
> Key: FLINK-31345
> URL: https://issues.apache.org/jira/browse/FLINK-31345
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Maximilian Michels
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.5.0
>
>
> When the {{autoscaler-}} ConfigMap which is used to persist 
> scaling decisions and metrics becomes too large, the following error is 
> thrown consistently:
> {noformat}
> io.fabric8.kubernetes.client.KubernetesClientException: Operation: [replace]  
> for kind: [ConfigMap]  with name: [deployment]  in namespace: [namespace]  
> failed.
>     at 
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:159)
>     at 
> io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.lambda$replace$0(HasMetadataOperation.java:169)
>     at 
> io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:172)
>     at 
> io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:113)
>     at 
> io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:41)
>     at 
> io.fabric8.kubernetes.client.extension.ResourceAdapter.replace(ResourceAdapter.java:252)
>     at 
> org.apache.flink.kubernetes.operator.autoscaler.AutoScalerInfo.replaceInKubernetes(AutoScalerInfo.java:167)
>     at 
> org.apache.flink.kubernetes.operator.autoscaler.JobAutoScalerImpl.scale(JobAutoScalerImpl.java:113)
>     at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:178)
>     at 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:130)
>     at 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>     at 
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:145)
>     at 
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:103)
>     at 
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>     at 
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:102)
>     at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>     at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>     at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>     at 
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>     at 
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
>     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.io.IOException: stream was reset: NO_ERROR
>     at 
> io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:514)
>     at 
> io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:551)
>     at 
> io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleUpdate(OperationSupport.java:347)
>     at 
> io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleUpdate(BaseOperation.java:680)
>     at 
> io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.lambda$replace$0(HasMetadataOperation.java:167)
>     ... 21 more
> Caused by: okhttp3.internal.http2.StreamResetException: stream was reset: 
> NO_ERROR
>     at 
> okhttp3.internal.http2.Http2Stream.checkOutNotClosed$okhttp(Http2Stream.kt:646)
>     at 
> okhttp3.internal.http2.Http2Stream$FramingSink.emitFrame(Http2Stream.kt:557)
>     at 
> okhttp3.internal.http2.Http2Stream$FramingSink.write(Http2Stream.kt:532)
>     at okio.ForwardingSink.write(ForwardingSink.kt:29)
>     at 
> 

[GitHub] [flink-kubernetes-operator] gyfora merged pull request #547: [FLINK-31345] Reduce AutoScalerInfo size by rounding metrics and compression

2023-03-08 Thread via GitHub


gyfora merged PR #547:
URL: https://github.com/apache/flink-kubernetes-operator/pull/547


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on pull request #22134: [FLINK-31370] Prevent more timers from being fired if the StreamTask…

2023-03-08 Thread via GitHub


dmvk commented on PR #22134:
URL: https://github.com/apache/flink/pull/22134#issuecomment-1460402364

   Thanks for the review, @pnowojski! I've added the same logic for the 
processing time timers. It would be super helpful if you could verify the 
change against benchmarks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21452: [FLINK-30282] Fix Logical type ROW lost inner field's nullability after converting to RelDataType

2023-03-08 Thread via GitHub


snuyanzin commented on code in PR #21452:
URL: https://github.com/apache/flink/pull/21452#discussion_r1129506976


##
flink-table/flink-sql-client/src/test/resources/sql/table.q:
##
@@ -964,7 +998,7 @@ CREATE TABLE IF NOT EXISTS daily_orders (
  PRIMARY KEY(dt, `user`) NOT ENFORCED
 ) PARTITIONED BY (dt) WITH (
  'connector' = 'filesystem',
- 'path' = '$VAR_BATCH_PATH',
+ 'path' = 
'/var/folders/xd/9dp1y4vd3h56kjkvdk426l50gp/T/junit5781726749537647921/7bf278b4-9932-4e24-af7b-1126df4667405030013249695173150',

Review Comment:
   this i didn't get...
   can you  please elaborate 
   where does this magic name come from?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RyanSkraba commented on a diff in pull request #22082: [FLINK-31300][table] TRY_CAST for constructed types

2023-03-08 Thread via GitHub


RyanSkraba commented on code in PR #22082:
URL: https://github.com/apache/flink/pull/22082#discussion_r1129649372


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRule.java:
##
@@ -46,7 +46,9 @@ CastExecutor create(
 Context context, LogicalType inputLogicalType, LogicalType 
targetLogicalType);
 
 /** Returns true if the {@link CastExecutor} can fail at runtime. */
-boolean canFail(LogicalType inputLogicalType, LogicalType 
targetLogicalType);

Review Comment:
   Yes, this LGTM.
   
   I checked: `ConstructedToConstructedCastRule` is the only other 
**interface** that attempts to provide a `canFail` default implementation, and 
there is no other abstract **class** that provides an implementation, so this 
change should provide the correct implementation to `ArrayToArrayCastRule`, or 
`MapToMapAndMultisetToMultisetCastRule`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-03-08 Thread via GitHub


liuyongvs commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1129607127


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementArgumentTypeStrategy.java:
##
@@ -42,7 +42,7 @@ public Optional inferArgumentType(
 CallContext callContext, int argumentPos, boolean throwOnFailure) {
 final ArrayType haystackType =
 (ArrayType) 
callContext.getArgumentDataTypes().get(0).getLogicalType();
-final LogicalType haystackElementType = haystackType.getElementType();
+final LogicalType haystackElementType = 
haystackType.getElementType().copy(true);

Review Comment:
   yeap, you are right. very thanks , and i also add a unit test for the 2nd 
case, in case of others modify this logical



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-08 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1129605975


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##
@@ -1966,61 +1982,60 @@ private Operation convertAlterTableModifyCols(
 }
 
 ObjectIdentifier tableIdentifier = parseObjectIdentifier(tblName);
-CatalogTable oldTable = (CatalogTable) alteredTable;
+ResolvedCatalogTable oldTable =
+catalogManager.resolveCatalogTable((CatalogTable) 
alteredTable);
 
 // prepare properties
 Map props = new HashMap<>(oldTable.getOptions());
 props.put(ALTER_TABLE_OP, ALTER_COLUMNS.name());
 if (isCascade) {
 props.put(ALTER_COL_CASCADE, "true");
 }
-TableSchema oldSchema = oldTable.getSchema();
+ResolvedSchema oldSchema = oldTable.getResolvedSchema();
 final int numPartCol = oldTable.getPartitionKeys().size();
-TableSchema.Builder builder = TableSchema.builder();
 // add existing non-part col if we're not replacing
+List newColumns = new ArrayList<>();
 if (!replace) {
-List nonPartCols =
-oldSchema.getTableColumns().subList(0, 
oldSchema.getFieldCount() - numPartCol);
-for (TableColumn column : nonPartCols) {
-builder.add(column);
-}
-setWatermarkAndPK(builder, oldSchema);
+List nonPartCols =
+oldSchema.getColumns().subList(0, 
oldSchema.getColumnCount() - numPartCol);
+
+newColumns.addAll(nonPartCols);
 }
 // add new cols
 for (FieldSchema col : newCols) {
-builder.add(
-TableColumn.physical(
+newColumns.add(
+Column.physical(
 col.getName(),
 HiveTypeUtil.toFlinkType(
 
TypeInfoUtils.getTypeInfoFromTypeString(col.getType();
 }
 // add part cols
-List partCols =
+List partCols =
 oldSchema
-.getTableColumns()
-.subList(oldSchema.getFieldCount() - numPartCol, 
oldSchema.getFieldCount());
-for (TableColumn column : partCols) {
-builder.add(column);
+.getColumns()
+.subList(
+oldSchema.getColumnCount() - numPartCol,
+oldSchema.getColumnCount());
+newColumns.addAll(partCols);
+ResolvedSchema newSchema;
+if (!replace) {
+newSchema =
+new ResolvedSchema(
+newColumns,
+oldSchema.getWatermarkSpecs(),
+oldSchema.getPrimaryKey().orElse(null));
+} else {
+newSchema = ResolvedSchema.of(newColumns);
 }
 return new AlterTableSchemaOperation(
 tableIdentifier,
-new CatalogTableImpl(
-builder.build(),
-oldTable.getPartitionKeys(),
-props,
-oldTable.getComment()));
-}
-
-private static void setWatermarkAndPK(TableSchema.Builder builder, 
TableSchema schema) {
-for (WatermarkSpec watermarkSpec : schema.getWatermarkSpecs()) {
-builder.watermark(watermarkSpec);
-}
-schema.getPrimaryKey()
-.ifPresent(
-pk -> {
-builder.primaryKey(
-pk.getName(), pk.getColumns().toArray(new 
String[0]));
-});
+new ResolvedCatalogTable(
+CatalogTable.of(
+
Schema.newBuilder().fromResolvedSchema(newSchema).build(),
+oldTable.getComment(),
+oldTable.getPartitionKeys(),
+props),
+newSchema));
 }

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #22134: [FLINK-31370] Prevent more timers from being fired if the StreamTask…

2023-03-08 Thread via GitHub


pnowojski commented on code in PR #22134:
URL: https://github.com/apache/flink/pull/22134#discussion_r1129592490


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java:
##
@@ -296,7 +302,9 @@ public void advanceWatermark(long time) throws Exception {
 
 InternalTimer timer;
 
-while ((timer = eventTimeTimersQueue.peek()) != null && 
timer.getTimestamp() <= time) {
+while ((timer = eventTimeTimersQueue.peek()) != null
+&& timer.getTimestamp() <= time
+&& !cancellationContext.isCancelled()) {

Review Comment:
   I would add the same check 10 lines above in the `onProcessingTimer()` 
method (and a test for it)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on a diff in pull request #21522: [FLINK-29585][hive] Migrate TableSchema to Schema for Hive connector

2023-03-08 Thread via GitHub


Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1129582760


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java:
##
@@ -2106,7 +2111,7 @@ public static CatalogBaseTable getCatalogBaseTable(
 public static class TableSpec {
 public ObjectIdentifier tableIdentifier;
 public String tableName;
-public CatalogBaseTable table;
+public ResolvedCatalogBaseTable table;

Review Comment:
   I think we need the table to be resolved since we want 
`validatePartColumnType` on this, it requires the resolved type information. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] echauchot commented on pull request #22124: [FLINK-30805] Ask for a new split when one has finished in SourceReader default implementation

2023-03-08 Thread via GitHub


echauchot commented on PR #22124:
URL: https://github.com/apache/flink/pull/22124#issuecomment-1460260004

   @MartijnVisser CI tests pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30805) SplitEnumerator#handleSplitRequest() should be called automatically

2023-03-08 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697925#comment-17697925
 ] 

Etienne Chauchot commented on FLINK-30805:
--

[~MartijnVisser] can you assign this ticket to me ?

> SplitEnumerator#handleSplitRequest() should be called automatically
> ---
>
> Key: FLINK-30805
> URL: https://issues.apache.org/jira/browse/FLINK-30805
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Reporter: Etienne Chauchot
>Priority: Major
>  Labels: pull-request-available
>
> SplitEnumerator#handleSplitRequest() is not called automatically by the new 
> source framework which could be surprising to a source author. Right now a 
> source author would have to call it himself when a split is finished or early 
> when the reader gets created. 
> IMHO it would be good if we could find a way for the framework to call it 
> when a split is finished automatically



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] snuyanzin commented on a diff in pull request #21452: [FLINK-30282] Fix Logical type ROW lost inner field's nullability after converting to RelDataType

2023-03-08 Thread via GitHub


snuyanzin commented on code in PR #21452:
URL: https://github.com/apache/flink/pull/21452#discussion_r1129506976


##
flink-table/flink-sql-client/src/test/resources/sql/table.q:
##
@@ -964,7 +998,7 @@ CREATE TABLE IF NOT EXISTS daily_orders (
  PRIMARY KEY(dt, `user`) NOT ENFORCED
 ) PARTITIONED BY (dt) WITH (
  'connector' = 'filesystem',
- 'path' = '$VAR_BATCH_PATH',
+ 'path' = 
'/var/folders/xd/9dp1y4vd3h56kjkvdk426l50gp/T/junit5781726749537647921/7bf278b4-9932-4e24-af7b-1126df4667405030013249695173150',

Review Comment:
   this i didn't get...
   where does this magic name come from?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-03-08 Thread via GitHub


snuyanzin commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1129483531


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementArgumentTypeStrategy.java:
##
@@ -42,7 +42,7 @@ public Optional inferArgumentType(
 CallContext callContext, int argumentPos, boolean throwOnFailure) {
 final ArrayType haystackType =
 (ArrayType) 
callContext.getArgumentDataTypes().get(0).getLogicalType();
-final LogicalType haystackElementType = haystackType.getElementType();
+final LogicalType haystackElementType = 
haystackType.getElementType().copy(true);

Review Comment:
   it shouldn't be always nullable.
   i see 3 cases:
   1. `haystackType.getElementType()` is already nullable => no need for force 
nullable
   2. `haystackType.getElementType()` is not nullable and `needleType` is not 
nullable => no need for force nullable
   3.  `haystackType.getElementType()` is not nullable and `needleType` is 
nullable => need for force nullable.
   
   right now it does some not needed work for the first case and breaks the 2nd 
case
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22134: [FLINK-31370] Prevent more timers from being fired if the StreamTask…

2023-03-08 Thread via GitHub


flinkbot commented on PR #22134:
URL: https://github.com/apache/flink/pull/22134#issuecomment-1460212176

   
   ## CI report:
   
   * 4eac832e73fb0a8ca8ee44338f9ff30ca0dcfe34 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #21993: [FLINK-31166][table] Fix array_contains does not support null argumen…

2023-03-08 Thread via GitHub


snuyanzin commented on code in PR #21993:
URL: https://github.com/apache/flink/pull/21993#discussion_r1129483531


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementArgumentTypeStrategy.java:
##
@@ -42,7 +42,7 @@ public Optional inferArgumentType(
 CallContext callContext, int argumentPos, boolean throwOnFailure) {
 final ArrayType haystackType =
 (ArrayType) 
callContext.getArgumentDataTypes().get(0).getLogicalType();
-final LogicalType haystackElementType = haystackType.getElementType();
+final LogicalType haystackElementType = 
haystackType.getElementType().copy(true);

Review Comment:
   it shouldn't be always nullable.
   i see 3 cases:
   1. `haystackType.getElementType()` is already nullable => no need for force 
nullable
   2. `haystackType.getElementType()` is not nullable and `needleType` is not 
nullable => no need for force nullable
   3.  `haystackType.getElementType()` is not nullable and `needleType` is 
nullable => need for force nullable.
   
   right now it breaks first 2 cases
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31370) Cancellation of the StreamTask should prevent more timers from being fired

2023-03-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31370:
---
Labels: pull-request-available  (was: )

> Cancellation of the StreamTask should prevent more timers from being fired
> --
>
> Key: FLINK-31370
> URL: https://issues.apache.org/jira/browse/FLINK-31370
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: David Morávek
>Assignee: David Morávek
>Priority: Major
>  Labels: pull-request-available
>
> If the task is canceled while the watermark progresses, it may be stuck in 
> the Cancelling state for a long time (e.g., when many windows are firing). 
> This is closely related to FLINK-20217, which might bring a more robust 
> solution for checkpoint and cancellation code paths.
> As a stopgap solution, we'll introduce a check allowing InternalTimerService 
> to break out of the firing loop if the StreamTask has been marked as canceled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dmvk opened a new pull request, #22134: [FLINK-31370] Prevent more timers from being fired if the StreamTask…

2023-03-08 Thread via GitHub


dmvk opened a new pull request, #22134:
URL: https://github.com/apache/flink/pull/22134

   https://issues.apache.org/jira/browse/FLINK-31370
   
   If the task is canceled while the watermark progresses, it may be stuck in 
the Cancelling state for a long time (e.g., when many windows are firing). This 
is closely related to 
[FLINK-20217](https://issues.apache.org/jira/browse/FLINK-20217), which might 
bring a more robust solution for checkpoint and cancellation code paths.
   
   As a stopgap solution, we'll introduce a check allowing InternalTimerService 
to break out of the firing loop if the StreamTask has been marked as canceled.
   
   
   *Performance consideration*: This adds a lookup to a volatile field on the 
per-timer code path.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31370) Cancellation of the StreamTask should prevent more timers from being fired

2023-03-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-31370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Morávek updated FLINK-31370:
--
Description: 
If the task is canceled while the watermark progresses, it may be stuck in the 
Cancelling state for a long time (e.g., when many windows are firing). This is 
closely related to FLINK-20217, which might bring a more robust solution for 
checkpoint and cancellation code paths.

As a stopgap solution, we'll introduce a check allowing InternalTimerService to 
break out of the firing loop if the StreamTask has been marked as canceled.

  was:If the task is being canceled while the watermark is progressing, it may 
be stuck in the Cancelling state for a long time (e.g. when there are many 
windows firing). This is closely related to 


> Cancellation of the StreamTask should prevent more timers from being fired
> --
>
> Key: FLINK-31370
> URL: https://issues.apache.org/jira/browse/FLINK-31370
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: David Morávek
>Assignee: David Morávek
>Priority: Major
>
> If the task is canceled while the watermark progresses, it may be stuck in 
> the Cancelling state for a long time (e.g., when many windows are firing). 
> This is closely related to FLINK-20217, which might bring a more robust 
> solution for checkpoint and cancellation code paths.
> As a stopgap solution, we'll introduce a check allowing InternalTimerService 
> to break out of the firing loop if the StreamTask has been marked as canceled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31370) Cancellation of the StreamTask should prevent more timers from being fired

2023-03-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-31370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Morávek updated FLINK-31370:
--
Description: If the task is being canceled while the watermark is 
progressing, it may be stuck in the Cancelling state for a long time (e.g. when 
there are many windows firing). This is closely related to 

> Cancellation of the StreamTask should prevent more timers from being fired
> --
>
> Key: FLINK-31370
> URL: https://issues.apache.org/jira/browse/FLINK-31370
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: David Morávek
>Assignee: David Morávek
>Priority: Major
>
> If the task is being canceled while the watermark is progressing, it may be 
> stuck in the Cancelling state for a long time (e.g. when there are many 
> windows firing). This is closely related to 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31370) Cancellation of the StreamTask should prevent more timers from being fired

2023-03-08 Thread Jira
David Morávek created FLINK-31370:
-

 Summary: Cancellation of the StreamTask should prevent more timers 
from being fired
 Key: FLINK-31370
 URL: https://issues.apache.org/jira/browse/FLINK-31370
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: David Morávek
Assignee: David Morávek






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] echauchot commented on pull request #22124: [FLINK-30805] Ask for a new split when one has finished in SourceReader default implementation

2023-03-08 Thread via GitHub


echauchot commented on PR #22124:
URL: https://github.com/apache/flink/pull/22124#issuecomment-1460166102

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on pull request #21452: [FLINK-30282] Fix Logical type ROW lost inner field's nullability after converting to RelDataType

2023-03-08 Thread via GitHub


LadyForest commented on PR #21452:
URL: https://github.com/apache/flink/pull/21452#issuecomment-1460103874

   Hi @snuyanzin, could you mind taking a look? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31368) Move operation execution logic out from TableEnvironmentImpl

2023-03-08 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697868#comment-17697868
 ] 

lincoln lee commented on FLINK-31368:
-

[~jark] +1 for this refactoring, it will prevent the `TableEnvironmentImpl` 
from becoming even more bloated and provide benefits for readability and code 
maintenance.

> Move operation execution logic out from TableEnvironmentImpl
> 
>
> Key: FLINK-31368
> URL: https://issues.apache.org/jira/browse/FLINK-31368
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Reporter: Jark Wu
>Priority: Major
>
> Currently, {{TableEnvironmentImpl}} is a bit bloated. The implementation of 
> {{TableEnvironmentImpl}} is filled with lots of operation execution logic 
> which makes the class hard to read and maintain. Once you want to add/update 
> an operation, you have to touch the {{TableEnvironmentImpl}}, which is 
> unnecessary and not clean. 
> An improvement idea is to extract the operation execution logic (mainly the 
> command operation, which doesn't trigger a Flink job) out from 
> {{TableEnvironmentImpl}} and put it close to the corresponding operation. 
> This is how Spark does with {{RunnableCommand}} and {{V2CommandExec}}. In 
> this way, we only need to add a new class of {{Operation}} without modifying 
> {{TableEnvironmentImpl}} to support a new command.
> This is just an internal refactoring that doesn't affect user APIs and is 
> backward-compatible. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >