[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


salvalcantara commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1009072959


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped
+writer.write(null, SINK_WRITER_CONTEXT);
+timeService.trigger();
+assertThat(numBytesOut.getCount()).isEqualTo(0L);
+assertThat(numRecordsOut.getCount()).isEqualTo(0);
+assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
+assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+
+// but properly serialized elements should count just normally

Review Comment:
   I considered that too, indeed that was my initial plan:
   
   - Add another dummy serializer, e.g., `NullRecordSerializer`, which simply 
returns `null` regardless of the input element
   - Add a separate test, e.g., `testWriteNullProducerRecord`, along the lines 
of your suggestion
   
   In the end, I didn't do so because it seems simpler and at the same time 
more realistic to me having the current test 
(`testIncreasingRecordBasedCounters`) check that we get the counting right when 
interleaving both valid and invalid elements (using the same 
`DummyRecordSerializer`), as opposed to checking those cases in isolation. 
Also, the new, separate, test would essentially be a clone of the existing one, 
plus require yet another`createWriterWithConfiguration` method/override that 
considers the new dummy serializer instead.
   
   In summary, I think `testIncreasingRecordBasedCounters` is actually a good 
spot for checking the new functionality. Having said that, we can of course 
create a separate test if my arguments are not convincing enough. Whatever we 
decide!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] leonardBang commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


leonardBang commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1009076348


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped
+writer.write(null, SINK_WRITER_CONTEXT);
+timeService.trigger();
+assertThat(numBytesOut.getCount()).isEqualTo(0L);
+assertThat(numRecordsOut.getCount()).isEqualTo(0);
+assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
+assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+
+// but properly serialized elements should count just normally

Review Comment:
   Thanks @salvalcantara for the detail explanation, Adding to current test 
could check the metrics and checks the writing null ProduceRecord as well, it 
makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


salvalcantara commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1009072959


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped
+writer.write(null, SINK_WRITER_CONTEXT);
+timeService.trigger();
+assertThat(numBytesOut.getCount()).isEqualTo(0L);
+assertThat(numRecordsOut.getCount()).isEqualTo(0);
+assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
+assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+
+// but properly serialized elements should count just normally

Review Comment:
   I considered that too, indeed that was my initial plan:
   
   - Add another dummy serializer, e.g., `NullRecordSerializer`, which simply 
returns `null` regardless of the input element
   - Add a separate test, e.g., `testWriteNullProducerRecord`, along the lines 
of your suggestion
   
   In the end, I didn't do so because it seems simpler and at the same time 
more realistic to me having the current test 
(`testIncreasingRecordBasedCounters`) check that we get the counting right when 
interleaving both valid and invalid elements (using the same 
`DummyRecordSerializer`), as opposed to checking those cases in isolation. 
Also, the new, separate, test would essentially be a clone of the existing one, 
plus require yet another`createWriterWithConfiguration` method/override that 
considers the new dummy serializer instead.
   
   In summary, I think `testIncreasingRecordBasedCounters` is actually a good 
spot for checking the new functionality. Having said that, we can of course 
create a separate test if my arguments are not convincing enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


salvalcantara commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1009072959


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped
+writer.write(null, SINK_WRITER_CONTEXT);
+timeService.trigger();
+assertThat(numBytesOut.getCount()).isEqualTo(0L);
+assertThat(numRecordsOut.getCount()).isEqualTo(0);
+assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
+assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+
+// but properly serialized elements should count just normally

Review Comment:
   I considered that too, indeed that was my initial plan:
   
   - Add another dummy serializer, e.g., `NullRecordSerializer`, which simply 
returns `null` regardless of the input element
   - Add a separate test, e.g., `testWriteNullProducerRecord`, along the lines 
of your suggestion
   
   In the end, I didn't do so because it seems simpler and at the same time 
more realistic to me having the current test 
(`testIncreasingRecordBasedCounters`) check that we get the counting right when 
interleaving both valid and invalid elements (using the same 
`DummyRecordSerializer`), as opposed to doing so in isolation. Also, the new, 
separate, test would essentially be a clone of the existing one, plus require 
yet another`createWriterWithConfiguration` method/override that considers the 
new dummy serializer instead.
   
   In summary, I think `testIncreasingRecordBasedCounters` is actually a good 
spot for checking the new functionality. Having said that, we can of course 
create a separate test if my arguments are not convincing enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


salvalcantara commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1009072959


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped
+writer.write(null, SINK_WRITER_CONTEXT);
+timeService.trigger();
+assertThat(numBytesOut.getCount()).isEqualTo(0L);
+assertThat(numRecordsOut.getCount()).isEqualTo(0);
+assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
+assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+
+// but properly serialized elements should count just normally

Review Comment:
   I considered that too, indeed that was my initial plan:
   
   - Add another dummy serializer, e.g., `NullRecordSerializer`, which simply 
returns `null` regardless of the input element
   - Add a separate test, e.g., `testWriteNullProducerRecord`, along the lines 
of your suggestion
   
   In the end, I didn't do so because it seems simpler and at the same time 
more realistic to me having the current test 
(`testIncreasingRecordBasedCounters`) check that we get the counting right when 
interleaving both valid and invalid elements (using the same 
`DummyRecordSerializer`), as opposed to doing so in isolation. Also note that 
new separate test would essentially be a clone of the existing one, plus 
require yet another`createWriterWithConfiguration` method/override that 
considers the new dummy serializer instead.
   
   In summary, I think `testIncreasingRecordBasedCounters` is actually a good 
spot for checking the new functionality. Having said that, we can of course 
create a separate test if my arguments are not convincing enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29788) StatefulJobWBroadcastStateMigrationITCase failed in native savepoints

2022-10-30 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626426#comment-17626426
 ] 

Hangxiang Yu commented on FLINK-29788:
--

I have commited a pr to fix it in the test.


I think the release processes could be improved here.

We could update the FlinkVersion of these classes finally, but the migration 
test should be executed when we prepare for the release.

It could help us to find the migration exception earily.

(Some exceptions maybe not so simple)
WDYT? [~hxbks2ks] 

> StatefulJobWBroadcastStateMigrationITCase failed in native savepoints
> -
>
> Key: FLINK-29788
> URL: https://issues.apache.org/jira/browse/FLINK-29788
> Project: Flink
>  Issue Type: Bug
>  Components: Release System, Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2022-10-28-11-18-45-471.png
>
>
>  !image-2022-10-28-11-18-45-471.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


salvalcantara commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1009072959


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped
+writer.write(null, SINK_WRITER_CONTEXT);
+timeService.trigger();
+assertThat(numBytesOut.getCount()).isEqualTo(0L);
+assertThat(numRecordsOut.getCount()).isEqualTo(0);
+assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
+assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+
+// but properly serialized elements should count just normally

Review Comment:
   I considered that too, indeed that was my initial plan:
   
   - Add another dummy serializer, e.g., `NullRecordSerializer`, which simply 
returns `null` regardless of the input element
   - Add a separate test, e.g., `testWriteNullProducerRecord`, along the lines 
of your suggestion
   
   In the end, I didn't do so because it seems simpler and at the same time 
more realistic to me having the current test 
(`testIncreasingRecordBasedCounters`) check that we get the counting right when 
interleaving both valid and invalid elements (using the same 
`NullRecordSerializer`), as opposed to doing so in isolation. Also note that 
new separate test would essentially be a clone of the existing one, plus 
require yet another`createWriterWithConfiguration` method/override that 
considers the new dummy serializer instead.
   
   In summary, I think `testIncreasingRecordBasedCounters` is actually a good 
spot for checking the new functionality. Having said that, we can of course 
create a separate test if my arguments are not convincing enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-10-30 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626425#comment-17626425
 ] 

Jark Wu commented on FLINK-29801:
-

I think this is a nice feature.  [~zhuzh] what do you think? 

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29458) When two tables have the same field, do not specify the table name,Exception will be thrown:SqlValidatorException :Column 'currency' is ambiguous

2022-10-30 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-29458.
---
Fix Version/s: 1.17.0
   Resolution: Fixed

Fixed in master: 97fbb701314205fd1d51d7edb1f6ef7a27f880c7

> When two tables have the same field, do not specify the table name,Exception 
> will be thrown:SqlValidatorException :Column 'currency' is ambiguous
> -
>
> Key: FLINK-29458
> URL: https://issues.apache.org/jira/browse/FLINK-29458
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Table SQL / API
>Affects Versions: 1.14.4
>Reporter: ZuoYan
>Assignee: ZuoYan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: image-2022-09-28-21-00-01-302.png, 
> image-2022-09-28-21-00-09-054.png, image-2022-09-28-21-00-22-733.png
>
>
> When two tables are join, the two tables have the same field. When querying 
> select, an exception will be thrown if the table name is not specified
> exception content
> Column 'currency' is ambiguous。
> !image-2022-09-28-21-00-22-733.png!
>  
> !image-2022-09-28-21-00-01-302.png!
> !image-2022-09-28-21-00-09-054.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wuchong merged pull request #20983: [FLINK-29458][docs] When two tables have the same field, do not speci…

2022-10-30 Thread GitBox


wuchong merged PR #20983:
URL: https://github.com/apache/flink/pull/20983


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] masteryhx commented on pull request #21196: [FLINK-29788][state] Disable changelog temporally when SnapshotMigrationTestBase triggers native savepoint

2022-10-30 Thread GitBox


masteryhx commented on PR #21196:
URL: https://github.com/apache/flink/pull/21196#issuecomment-1296601937

   @HuangXingBo Could you also help to take a look ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21197: [FLINK-29801] OperatorCoordinator need open the way to operate metric…

2022-10-30 Thread GitBox


flinkbot commented on PR #21197:
URL: https://github.com/apache/flink/pull/21197#issuecomment-1296601625

   
   ## CI report:
   
   * beeb3741a5b0f156aa2ae30f595e7355ea2b5e4d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-10-30 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626423#comment-17626423
 ] 

yuemeng commented on FLINK-29801:
-

[~danny0405] Can you review this improvement, we need this to report some 
metrics in stream write coordinator

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21196: [FLINK-29788][state] Disable changelog temporally when SnapshotMigrationTestBase triggers native savepoint

2022-10-30 Thread GitBox


flinkbot commented on PR #21196:
URL: https://github.com/apache/flink/pull/21196#issuecomment-1296596494

   
   ## CI report:
   
   * 72aee69358c17d1e00e921b36a0119716a17b556 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-10-30 Thread yuemeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626419#comment-17626419
 ] 

yuemeng commented on FLINK-29801:
-

[~jark] [~rmetzger]  Can you review this improvement, because we really need it 
in Hudi to report some metric

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] masteryhx commented on pull request #21196: [FLINK-29788][state] Disable changelog temporally when SnapshotMigrationTestBase triggers native savepoint

2022-10-30 Thread GitBox


masteryhx commented on PR #21196:
URL: https://github.com/apache/flink/pull/21196#issuecomment-1296593973

   @klion26 @Myasuka Could you help to take a look ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-10-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29801:
---
Labels: pull-request-available  (was: )

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] JerryYue-M opened a new pull request, #21197: [FLINK-29801] OperatorCoordinator need open the way to operate metric…

2022-10-30 Thread GitBox


JerryYue-M opened a new pull request, #21197:
URL: https://github.com/apache/flink/pull/21197

   …Group interface
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 closed pull request #168: [hotfix] Remove redundant Internal annotations from BroadcastUtils

2022-10-30 Thread GitBox


zhipeng93 closed pull request #168: [hotfix] Remove redundant Internal 
annotations from BroadcastUtils
URL: https://github.com/apache/flink-ml/pull/168


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on pull request #168: [hotfix] Remove redundant Internal annotations from BroadcastUtils

2022-10-30 Thread GitBox


zhipeng93 commented on PR #168:
URL: https://github.com/apache/flink-ml/pull/168#issuecomment-1296590671

   Thanks for the PR. As we discussed in [1] and @jiangxin369 has already 
resolved it in #162, I am closing it.
   
   [1] https://github.com/apache/flink-ml/pull/162#discussion_r1007628054


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29802) ChangelogStateBackend supports native savepoint

2022-10-30 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-29802:


 Summary: ChangelogStateBackend supports native savepoint
 Key: FLINK-29802
 URL: https://issues.apache.org/jira/browse/FLINK-29802
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Hangxiang Yu
 Fix For: 1.17.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-10-30 Thread yuemeng (Jira)
yuemeng created FLINK-29801:
---

 Summary: OperatorCoordinator need open the way to operate 
metricGroup interface
 Key: FLINK-29801
 URL: https://issues.apache.org/jira/browse/FLINK-29801
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: yuemeng


Currently, We have no way to get metric group instances in OperatorCoordinator

In some cases, we may report some metric in OperatorCoordinator such as Flink 
hudi integrate scene, some meta will send to operator coordinator to commit to 
hdfs or hms

but we also need to report some metrics in operator coordinator for monitor 
purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29788) StatefulJobWBroadcastStateMigrationITCase failed in native savepoints

2022-10-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29788:
---
Labels: pull-request-available  (was: )

> StatefulJobWBroadcastStateMigrationITCase failed in native savepoints
> -
>
> Key: FLINK-29788
> URL: https://issues.apache.org/jira/browse/FLINK-29788
> Project: Flink
>  Issue Type: Bug
>  Components: Release System, Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2022-10-28-11-18-45-471.png
>
>
>  !image-2022-10-28-11-18-45-471.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] masteryhx opened a new pull request, #21196: [FLINK-29788][state] Disable changelog temporally when SnapshotMigrationTestBase triggers native savepoint

2022-10-30 Thread GitBox


masteryhx opened a new pull request, #21196:
URL: https://github.com/apache/flink/pull/21196

   
   
   ## What is the purpose of the change
   
   ChangelogStateBackend doesn't support native savepoint currently which is 
known, so we just disable changelog temporally when SnapshotMigrationTestBase 
triggers native savepoint
   
   
   ## Brief change log
   
 - Make SnapshotMigrationTestBase#executeAndSnapshot disbale changelog when 
triggering native savepoint.
   
   
   ## Verifying this change
   
   
   This change is already covered by existing tests, such as 
TypeSerializerSnapshotMigrationITCase/StatefulJobWBroadcastStateMigrationITCase.
   (need to modify FlinkVersion and ExecutionMode which will be modified by 
release manager later).
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-29800) Continuous failover will leak the inprogress output file

2022-10-30 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626415#comment-17626415
 ] 

luoyuxia edited comment on FLINK-29800 at 10/31/22 6:12 AM:


What's you idea for cleaning  up these files? From my sides, it may be not a 
trivial work.


was (Author: luoyuxia):
What's you idea for cleaning  up these files?

> Continuous failover will leak the inprogress output file
> 
>
> Key: FLINK-29800
> URL: https://issues.apache.org/jira/browse/FLINK-29800
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Reporter: Aitozi
>Priority: Major
>
> When running job which sink to the file system, the inprogress files will 
> keep growing when job keeps failover, it will do harm to the filesystem. I 
> think the clean up to the file which is currently written to should be 
> performed when job failing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] zhipeng93 merged pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quantiles

2022-10-30 Thread GitBox


zhipeng93 merged PR #162:
URL: https://github.com/apache/flink-ml/pull/162


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quantiles

2022-10-30 Thread GitBox


zhipeng93 commented on PR #162:
URL: https://github.com/apache/flink-ml/pull/162#issuecomment-1296585869

   Thanks for the update. Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29800) Continuous failover will leak the inprogress output file

2022-10-30 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626415#comment-17626415
 ] 

luoyuxia commented on FLINK-29800:
--

What's you idea for cleaning  up these files?

> Continuous failover will leak the inprogress output file
> 
>
> Key: FLINK-29800
> URL: https://issues.apache.org/jira/browse/FLINK-29800
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Reporter: Aitozi
>Priority: Major
>
> When running job which sink to the file system, the inprogress files will 
> keep growing when job keeps failover, it will do harm to the filesystem. I 
> think the clean up to the file which is currently written to should be 
> performed when job failing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


salvalcantara commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1009057372


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped
+writer.write(null, SINK_WRITER_CONTEXT);
+timeService.trigger();
+assertThat(numBytesOut.getCount()).isEqualTo(0L);
+assertThat(numRecordsOut.getCount()).isEqualTo(0);
+assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
+assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+
+// but properly serialized elements should count just normally

Review Comment:
   @leonardBang I would also update this companion note...
   ```suggestion
   // but elements for which a non-null producer record is returned 
should count
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


salvalcantara commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1009057372


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped
+writer.write(null, SINK_WRITER_CONTEXT);
+timeService.trigger();
+assertThat(numBytesOut.getCount()).isEqualTo(0L);
+assertThat(numRecordsOut.getCount()).isEqualTo(0);
+assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
+assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+
+// but properly serialized elements should count just normally

Review Comment:
   @leonardBang I would also update this companion note
   ```suggestion
   // but elements for which a non-null producer record is returned 
should count
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #21185: [FLINK-28643][runtime-web] HistoryServer support lazy unzip

2022-10-30 Thread GitBox


reswqa commented on code in PR #21185:
URL: https://github.com/apache/flink/pull/21185#discussion_r1008996802


##
flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java:
##
@@ -143,5 +143,16 @@ public class HistoryServerOptions {
 
code("IllegalConfigurationException"))
 .build());
 
+public static final ConfigOption HISTORY_SERVER_CACHED_JOBS =
+key("historyserver.archive.cached-jobs")
+.intType()
+.defaultValue(500)
+.withDescription(

Review Comment:
   Why not use `withDescription(String description)`  directly.



##
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java:
##
@@ -30,12 +31,33 @@
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the HistoryServerStaticFileServerHandler. */
 class HistoryServerStaticFileServerHandlerTest {
 
+@Test
+void testExtractJobId() {

Review Comment:
   we should migrate tests involved in this pr to JUnit5 and AssertJ.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] leonardBang commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


leonardBang commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1009056433


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped

Review Comment:
   > Agreed. Maybe something along these lines?
   
   Yes, this is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


salvalcantara commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1009054195


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped

Review Comment:
   Agreed. Maybe something along these lines @leonardBang ?
   
   ```suggestion
   // elements for which the serializer returns null should be 
silently skipped
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] salvalcantara commented on a diff in pull request #21186: [FLINK-29480][Connectors/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


salvalcantara commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1009054195


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped

Review Comment:
   Agreed. Maybe something along these lines?
   
   ```suggestion
   // elements for which the serializer returns null should be 
silently skipped
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] libenchao commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

2022-10-30 Thread GitBox


libenchao commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1009034445


##
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return 
Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+extends ExpressionDefaultVisitor> {
+
+private Function quoteIdentifierFunction;
+
+public JdbcFilterPushdownPreparedStatementVisitor(
+Function quoteIdentifierFunction) {
+this.quoteIdentifierFunction = quoteIdentifierFunction;
+}
+
+@Override
+public Optional visit(CallExpression call) {
+if 
(BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("<", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
 {
+return renderBinaryOperator("<=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator(">", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
 {
+return renderBinaryOperator(">=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("<>", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("OR", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("AND", call.getResolvedChildren());
+}
+
+return Optional.empty();
+}
+
+private Optional renderBinaryOperator(
+String operator, List allOperands) {
+Optional leftOperandString = 
allOperands.get(0).accept(this);
+
+Optional rightOperandString = 
allOperands.get(1).accept(this);
+
+return leftOperandString.flatMap(
+left -> rightOperandString.map(right -> left.combine(operator, 
right)));
+}
+
+@Override
+public Optional visit(ValueLiteralExpression 
litExp) {
+LogicalType tpe = litExp.getOutputDataType().getLogicalType();
+Serializable[] params = new Serializable[1];
+
+ParameterizedPredicate predicate = new ParameterizedPredicate("?");
+switch (tpe.getTypeRoot()) {

Review Comment:
   1. could you sort out the case branches, similar to `LogicalTypeRoot`
   2. there are still some left types, such as `CHAR`, `SMALLINT`, `TINYINT`, I 
would suggest that we add these types as much as possible, unless we cannot for 
now.



##

[jira] [Created] (FLINK-29800) Continuous failover will leak the inprogress output file

2022-10-30 Thread Aitozi (Jira)
Aitozi created FLINK-29800:
--

 Summary: Continuous failover will leak the inprogress output file
 Key: FLINK-29800
 URL: https://issues.apache.org/jira/browse/FLINK-29800
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Aitozi


When running job which sink to the file system, the inprogress files will keep 
growing when job keeps failover, it will do harm to the filesystem. I think the 
clean up to the file which is currently written to should be performed when job 
failing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29609) Clean up jobmanager deployment on suspend after recording savepoint info

2022-10-30 Thread Hector Miuler Malpica Gallegos (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626372#comment-17626372
 ] 

Hector Miuler Malpica Gallegos commented on FLINK-29609:


[~sriramgr] In my opinion, this should only happen in application mode, in 
session mode it should continue to exist waiting for a new job.

> Clean up jobmanager deployment on suspend after recording savepoint info
> 
>
> Key: FLINK-29609
> URL: https://issues.apache.org/jira/browse/FLINK-29609
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Sriram Ganesh
>Priority: Major
> Fix For: kubernetes-operator-1.3.0
>
>
> Currently in case of suspending with savepoint. The jobmanager pod will 
> linger there forever after cancelling the job.
> This is currently used to ensure consistency in case the 
> operator/cancel-with-savepoint operation fails.
> Once we are sure however that the savepoint has been recorded and the job is 
> shut down, we should clean up all the resources. Optionally we can make this 
> configurable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28539) Enable CompactionDynamicLevelBytes in FLASH_SSD_OPTIMIZED

2022-10-30 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626370#comment-17626370
 ] 

Yanfei Lei commented on FLINK-28539:


[~usamj]  Do you want to fix this? (I'd like to open a PR to address this.

> Enable CompactionDynamicLevelBytes in FLASH_SSD_OPTIMIZED
> -
>
> Key: FLINK-28539
> URL: https://issues.apache.org/jira/browse/FLINK-28539
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Usamah Jassat
>Priority: Minor
>
> Investigating the RocksDB predefined options I see that 
> `setLevelCompactionDynamicLevelBytes` is set for SPINNING_DISK options but 
> not FLASH_SSD_OPTIMIZED.
>  
> From my research it looks like this change would improve the Space 
> Amplification of RocksDB [1] (which can also lead to a trade-off from 
> read/write amplification [2]). It makes sense to me that this feature should 
> be enabled for SSD's as they tend to have less space compared to their HDD 
> counterparts.
> There is also an argument to be made to also disable it for SPINNING_DISK 
> options as it could give increased read/write performance [2]
> [1] [http://rocksdb.org/blog/2015/07/23/dynamic-level.html]
> [2] 
> [https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#amplification-factors]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28539) Enable CompactionDynamicLevelBytes in FLASH_SSD_OPTIMIZED

2022-10-30 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566618#comment-17566618
 ] 

Yanfei Lei edited comment on FLINK-28539 at 10/31/22 4:20 AM:
--

hi [~usamj] , +1 for enabling {{LevelCompactionDynamicLevelBytes}} for 
FLASH_SSD_OPTIMIZED. And now it's also possible to use 
{{LevelCompactionDynamicLevelBytes}} by configuring   
"RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE: true" manually.
{quote}There is also an argument to be made to also disable it for 
SPINNING_DISK options as it could give increased read/write performance [2]
{quote}
As the doc says, "Spinning disks usually provide much lower random read 
throughput than flash. If you use level-based compaction, use 
options.level_compaction_dynamic_level_bytes=true." I don't see it suggesting 
disabling {{LevelCompactionDynamicLevelBytes?}}

 


was (Author: yanfei lei):
hi [~usamj] , you can use `setLevelCompactionDynamicLevelBytes` for 
FLASH_SSD_OPTIMIZED  by configuring   
"RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE: true" manually.

 
{quote}There is also an argument to be made to also disable it for 
SPINNING_DISK options as it could give increased read/write performance [2]
{quote}
Currently, the default value of PREDEFINED_OPTIONS is [empty in 
Flink|https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java#L77-L90],
 users can choose whether to open according to their own situation.

> Enable CompactionDynamicLevelBytes in FLASH_SSD_OPTIMIZED
> -
>
> Key: FLINK-28539
> URL: https://issues.apache.org/jira/browse/FLINK-28539
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Usamah Jassat
>Priority: Minor
>
> Investigating the RocksDB predefined options I see that 
> `setLevelCompactionDynamicLevelBytes` is set for SPINNING_DISK options but 
> not FLASH_SSD_OPTIMIZED.
>  
> From my research it looks like this change would improve the Space 
> Amplification of RocksDB [1] (which can also lead to a trade-off from 
> read/write amplification [2]). It makes sense to me that this feature should 
> be enabled for SSD's as they tend to have less space compared to their HDD 
> counterparts.
> There is also an argument to be made to also disable it for SPINNING_DISK 
> options as it could give increased read/write performance [2]
> [1] [http://rocksdb.org/blog/2015/07/23/dynamic-level.html]
> [2] 
> [https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#amplification-factors]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on pull request #21137: [FLINK-29234][runtime] JobMasterServiceLeadershipRunner handle leader event in a separate executor to avoid dead lock

2022-10-30 Thread GitBox


reswqa commented on PR #21137:
URL: https://github.com/apache/flink/pull/21137#issuecomment-1296507911

   > We might also want to check the other `LeaderContender` implementations 
(as you already mentioned, `ResourceManagerImpl` does not suffer from this bug 
but the others might because they don't use this `handleLeaderEventHandler` 
pattern.
   
   @XComp I totally agree that we should also check the other `LeaderContender` 
to ensure they are not suffer from this. However, from my point of view, in 
consideration of limiting the scope of this pull request, I suggest that only 
the `JobMasterServiceLeadershipRunner` be tested in this PR. For other 
`LeaderContender`, I will open a separate ticket for tracking. WDYT?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29639) Add ResourceId in TransportException for debugging

2022-10-30 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626366#comment-17626366
 ] 

Xintong Song commented on FLINK-29639:
--

Hi [~Jiangang], is there any updates on this issue?

> Add ResourceId in TransportException for debugging 
> ---
>
> Key: FLINK-29639
> URL: https://issues.apache.org/jira/browse/FLINK-29639
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Liu
>Assignee: Liu
>Priority: Major
>
> When the taskmanager is lost, only the host and port are shown in the 
> exception. It is hard to find the exactly taskmanger by resourceId. Add 
> ResourceId info will help a lot in debugging the job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-28863) Snapshot result of RocksDB native savepoint should have empty shared-state

2022-10-30 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-28863:


Assignee: Jinzhong Li

> Snapshot result of RocksDB native savepoint should have empty shared-state
> --
>
> Key: FLINK-28863
> URL: https://issues.apache.org/jira/browse/FLINK-28863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Assignee: Jinzhong Li
>Priority: Major
> Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> The current snapshot result of RocksDB native savepoint has non-empty shared 
> state, which is obviously not correct as all snapshot artifacts already stay 
> in the exclusive checkpoint scope folder.
> This does not bring real harmful result due to we would not register the 
> snapshot results of RocksDB native savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28863) Snapshot result of RocksDB native savepoint should have empty shared-state

2022-10-30 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626365#comment-17626365
 ] 

Yun Tang commented on FLINK-28863:
--

[~lijinzhong] I think this idea should be correct, already assigned to you.

> Snapshot result of RocksDB native savepoint should have empty shared-state
> --
>
> Key: FLINK-28863
> URL: https://issues.apache.org/jira/browse/FLINK-28863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Yun Tang
>Priority: Major
> Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> The current snapshot result of RocksDB native savepoint has non-empty shared 
> state, which is obviously not correct as all snapshot artifacts already stay 
> in the exclusive checkpoint scope folder.
> This does not bring real harmful result due to we would not register the 
> snapshot results of RocksDB native savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28390) Allows RocksDB to configure FIFO Compaction to reduce CPU overhead.

2022-10-30 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626363#comment-17626363
 ] 

Yanfei Lei commented on FLINK-28390:


[~Ming Li] As the bug has been fixed and the changes are minor, I don't think 
it will conflict with the [fork version of 
flink.|https://github.com/ververica/frocksdb] I'd like to cherry-pick this to 
frocksdb.

And I'm +1 to [~yunta]'s comments. Rocksdb Configuration documentation has 
already been complicated, introducing new TTL setting maybe increase the burden 
on users to use TTL.

> Allows RocksDB to configure FIFO Compaction to reduce CPU overhead.
> ---
>
> Key: FLINK-28390
> URL: https://issues.apache.org/jira/browse/FLINK-28390
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: ming li
>Priority: Major
>
> We know that the fifo compaction strategy may silently delete data and may 
> lose data for the business. But in some scenarios, FIFO compaction can be a 
> very effective way to reduce CPU usage.
>  
> Flink's Taskmanager is usually some small-scale processes, such as allocating 
> 4 CPUs and 16G memory. When the state size is small, the CPU overhead 
> occupied by RocksDB is not high, and as the state increases, RocksDB may 
> frequently be in the compaction operation, which will occupy a large amount 
> of CPU and affect the computing operation.
>  
> We usually configure a TTL for the state, so when using FIFO we can configure 
> it to be slightly longer than the TTL, so that the upper layer is the same as 
> before. 
>  
> Although the FIFO Compaction strategy may bring space amplification, the disk 
> is cheaper than the CPU after all, so the overall cost is reduced.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28889) Hybrid shuffle should supports multiple consumer

2022-10-30 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-28889.

Resolution: Done

master (1.17): d11940c4a78c71548b5a06af50da2e5f9cb68918

> Hybrid shuffle should supports multiple consumer
> 
>
> Key: FLINK-28889
> URL: https://issues.apache.org/jira/browse/FLINK-28889
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.16.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Hybrid shuffle does not support multiple consumer for single subpartition 
> data. This will bring some defects, such as the inability to support 
> partition reuse, speculative execution. In particular, it cannot support 
> broadcast optimization, that is, hybrid shuffle writes multiple copies of 
> broadcast data. This will cause a waste of memory and disk space and affect 
> the performance of shuffle write phase. Ideally, for the full spilling 
> strategy, any broadcast data (record or event) should only write one piece of 
> data in the memory, and the same is true for the disk.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong closed pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

2022-10-30 Thread GitBox


xintongsong closed pull request #21122: [FLINK-28889] Hybrid shuffle supports 
multiple consumer and broadcast optimization
URL: https://github.com/apache/flink/pull/21122


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quantiles

2022-10-30 Thread GitBox


jiangxin369 commented on code in PR #162:
URL: https://github.com/apache/flink-ml/pull/162#discussion_r100968


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/QuantileSummary.java:
##
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Helper class to compute an approximate quantile summary. This 
implementation is based on the
+ * algorithm proposed in the paper: "Space-efficient Online Computation of 
Quantile Summaries" by
+ * Greenwald, Michael and Khanna, Sanjeev. 
(https://doi.org/10.1145/375663.375670)
+ */
+@Internal

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quantiles

2022-10-30 Thread GitBox


jiangxin369 commented on code in PR #162:
URL: https://github.com/apache/flink-ml/pull/162#discussion_r1008999126


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/QuantileSummary.java:
##
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Helper class to compute an approximate quantile summary. This 
implementation is based on the
+ * algorithm proposed in the paper: "Space-efficient Online Computation of 
Quantile Summaries" by
+ * Greenwald, Michael and Khanna, Sanjeev. 
(https://doi.org/10.1145/375663.375670)
+ */
+@Internal
+public class QuantileSummary implements Serializable {
+
+/** The target relative error. */
+private final double relativeError;
+
+/**
+ * The compression threshold. After the internal buffer of statistics 
crosses this size, it
+ * attempts to compress the statistics together.
+ */
+private final int compressThreshold;
+
+/** The count of all the elements inserted to be calculated. */
+private final long count;
+
+/** A buffer of quantile statistics. */
+private final List sampled;
+
+/** The default size of head buffer. */
+private static final int DEFAULT_HEAD_SIZE = 5;
+
+/** The default compression threshold. */
+private static final int DEFAULT_COMPRESS_THRESHOLD = 1;
+
+/** A buffer of the latest samples seen so far. */
+private List headBuffer = new ArrayList<>(DEFAULT_HEAD_SIZE);
+
+/**
+ * QuantileSummary Constructor.
+ *
+ * @param relativeError The target relative error.
+ */
+public QuantileSummary(double relativeError) {
+this(relativeError, DEFAULT_COMPRESS_THRESHOLD);
+}
+
+/**
+ * QuantileSummary Constructor.
+ *
+ * @param relativeError The target relative error.
+ * @param compressThreshold the compression threshold. After the internal 
buffer of statistics
+ * crosses this size, it attempts to compress the statistics together.
+ */
+public QuantileSummary(double relativeError, int compressThreshold) {
+this(relativeError, compressThreshold, Collections.EMPTY_LIST, 0);
+}
+
+/**
+ * QuantileSummary Constructor.
+ *
+ * @param relativeError The target relative error.
+ * @param compressThreshold the compression threshold.
+ * @param sampled A buffer of quantile statistics. See the G-K article for 
more details.
+ * @param count The count of all the elements inserted in the sampled 
buffer.
+ */
+private QuantileSummary(
+double relativeError, int compressThreshold, List 
sampled, long count) {
+Preconditions.checkArgument(
+relativeError > 0 && relativeError < 1,
+"An appropriate relative error must lay between 0 and 1.");
+Preconditions.checkArgument(
+compressThreshold > 0, "An compress threshold must greater 
than 0.");
+this.relativeError = relativeError;
+this.compressThreshold = compressThreshold;
+this.sampled = sampled;
+this.count = count;
+}
+
+/**
+ * Insert a new observation to the summary.
+ *
+ * @param item The new observation to insert into the summary.
+ * @return A summary with the given observation inserted into the summary.
+ */
+public QuantileSummary insert(Double item) {
+headBuffer.add(item);
+if (headBuffer.size() >= DEFAULT_HEAD_SIZE) {
+QuantileSummary result = insertHeadBuffer();
+if (result.sampled.size() >= compressThreshold) {
+return result.compress();
+} else {
+return result;
+}
+} else {
+return this;
+}
+}
+
+/**
+ * Return

[jira] [Updated] (FLINK-29795) The source.file.stream.io-fetch-size can not be set by table properties

2022-10-30 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-29795:
---
Description: The {{source.file.stream.io-fetch-size}} is used in the bulk 
format mode, but it is not exposed to the filesystem connector options. If I 
try to use it in the with property, it will fails with {{{}Unsupported 
options{}}}. It can only be set by add it to the {{flink-conf.yaml}} now, and 
the same session cluster share the same config value. It's not convenient to 
adjust it and I think it should be scoped to the table's property.  (was: The 
{{source.file.stream.io-fetch-size}} is used in the bulk format mode, but it is 
not exposed to the filesystem connector options. If I try to use it in the with 
property, it will fails with \{{Unsupported options}}. It can only be set by 
add it to the {{flink-conf.yaml}} now. It's not convenient and it should be 
scoped to the table's property.)

> The source.file.stream.io-fetch-size can not be set by table properties
> ---
>
> Key: FLINK-29795
> URL: https://issues.apache.org/jira/browse/FLINK-29795
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.17.0
>Reporter: Aitozi
>Priority: Major
>
> The {{source.file.stream.io-fetch-size}} is used in the bulk format mode, but 
> it is not exposed to the filesystem connector options. If I try to use it in 
> the with property, it will fails with {{{}Unsupported options{}}}. It can 
> only be set by add it to the {{flink-conf.yaml}} now, and the same session 
> cluster share the same config value. It's not convenient to adjust it and I 
> think it should be scoped to the table's property.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29639) Add ResourceId in TransportException for debugging

2022-10-30 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-29639:
-
Component/s: Runtime / Coordination

> Add ResourceId in TransportException for debugging 
> ---
>
> Key: FLINK-29639
> URL: https://issues.apache.org/jira/browse/FLINK-29639
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Liu
>Assignee: Liu
>Priority: Major
>
> When the taskmanager is lost, only the host and port are shown in the 
> exception. It is hard to find the exactly taskmanger by resourceId. Add 
> ResourceId info will help a lot in debugging the job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] leonardBang commented on a diff in pull request #21186: [FLINK-29480][Connector/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


leonardBang commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1008991930


##
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##
@@ -194,8 +194,10 @@
 public void write(IN element, Context context) throws IOException {
 final ProducerRecord record =
 recordSerializer.serialize(element, kafkaSinkContext, 
context.timestamp());

Review Comment:
   ```suggestion
   public void write(@Nullable IN element, Context context) throws 
IOException {
   final ProducerRecord record =
   recordSerializer.serialize(element, kafkaSinkContext, 
context.timestamp());
   ```



##
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java:
##
@@ -54,7 +54,7 @@ default void open(
  * @param element element to be serialized
  * @param context context to possibly determine target partition
  * @param timestamp timestamp
- * @return Kafka {@link ProducerRecord}
+ * @return Kafka {@link ProducerRecord} (null if the element cannot be 
serialized)
  */
 ProducerRecord serialize(T element, KafkaSinkContext 
context, Long timestamp);

Review Comment:
   ```suggestion
* @return Kafka {@link ProducerRecord}  or null if the given element 
cannot be serialized
*/
   @Nullable ProducerRecord serialize(T element, 
KafkaSinkContext context, Long timestamp);
   ```



##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped

Review Comment:
   minor: we can improve the note, because not all serializers will return 
`null` when the element cannot be serialized, not all `null` are due to cannot 
be serialized . 
   What we can ensure is only that the `KafkaSinkWriter` will silently skip 
`null` that returned by serializers



##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws 
Exception {
 assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
 assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
 
+// elements that cannot be serialized should be silently skipped
+writer.write(null, SINK_WRITER_CONTEXT);
+timeService.trigger();
+assertThat(numBytesOut.getCount()).isEqualTo(0L);
+assertThat(numRecordsOut.getCount()).isEqualTo(0);
+assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
+assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+
+// but properly serialized elements should count just normally

Review Comment:
   Could we add a new test like `testWriteNullElement` instead of modification 
in an existed test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] godfreyhe merged pull request #578: [hotfix] Fix some typos in 1.16 announcement

2022-10-30 Thread GitBox


godfreyhe merged PR #578:
URL: https://github.com/apache/flink-web/pull/578


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28553) The serializer in StateMap has not been updated when metaInfo of StateTable updated

2022-10-30 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626350#comment-17626350
 ] 

Yanfei Lei commented on FLINK-28553:


Thanks [~roman]  for merging in master 
[8df50536ef913b63620d896423c39cdd01941c55|https://github.com/apache/flink/commit/8df50536ef913b63620d896423c39cdd01941c55]

 

> The serializer in StateMap has not been updated when metaInfo of StateTable 
> updated
> ---
>
> Key: FLINK-28553
> URL: https://issues.apache.org/jira/browse/FLINK-28553
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.5, 1.15.1
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Minor
> Fix For: 1.17.0
>
>
> When the meta info in StateTable updated, the serializer in StateMap has not 
> been updated.
> (See StateTable#setMetaInfo)
> The value may be serialized/deserialized/copied incorrectly after triggering 
> state migration of HashMapStateBackend.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28581) Test Changelog StateBackend V2 Manually

2022-10-30 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei closed FLINK-28581.
--
Fix Version/s: 1.16.0
   Resolution: Resolved

> Test Changelog StateBackend V2 Manually
> ---
>
> Key: FLINK-28581
> URL: https://issues.apache.org/jira/browse/FLINK-28581
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Affects Versions: 1.16.0
>Reporter: Hangxiang Yu
>Priority: Major
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29748) Expose the optimize phase in the connector context

2022-10-30 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626349#comment-17626349
 ] 

Aitozi commented on FLINK-29748:



{noformat}
The rule will only apply once during the optimization because the rule uses hep 
optimizer which is not based on the cost.
{noformat}

Yes you are right. My {{optimization}} words mainly point to the whole planner 
phase, so the connector do not know whether the optimize is finished or not.
As a workaround, I think we can add the callback interface eg: 
{{onOptimizeFinished}} or {{onTranslateExecNodeStart}} on the 
{{ScanTableSource}} . All the external validation work on the final results can 
perform on that stage. what do you think about it ?

> Expose the optimize phase in the connector context
> --
>
> Key: FLINK-29748
> URL: https://issues.apache.org/jira/browse/FLINK-29748
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Aitozi
>Priority: Minor
>
> Currently, in the connector it can not know whether the whole optimize is 
> finished.
> When the optimize finished, the all information is static, eg: the reading 
> partitions. If I want to validate the final optimized result (like whether 
> the reading partition is too much or empty), it needs the context of what is 
> the current phase. I think the {{ScanContext}} is ok to expose this 
> information. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB

2022-10-30 Thread Caizhi Weng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626347#comment-17626347
 ] 

Caizhi Weng edited comment on FLINK-27246 at 10/31/22 2:18 AM:
---

Hi [~KristoffSC]!

Thanks for your interest in solving this issue. I'm the author of 
{{JavaCodeSplitter}} and I'd like to offer my thoughts.

*1. if my understanding and proposed high level solution for splitting the 
problematic code block into smaller chunks is correct?*

Your understanding is quite correct. However it might not be that easy to solve 
this issue due to some corner cases. For example we may have {{continue}}'s and 
{{break}}'s in a {{while}} loop, and we also need to deal with the loop 
variable in a {{for}} loop.

Due to these cases I choose not to recursively split most of the looping code 
blocks when implementing the first version of {{JavaCodeSplitter}}. However for 
{{if}} code blocks I implement a special {{IfStatementRewriter}} to split them 
because they are quite common in generated code and they don't suffer from 
those corner cases.

I'm not sure if the generated code in this particular ticket contains 
{{continue}}'s or {{break}}'s. If not we may choose to split code blocks 
without these keywords.

*2. should this be done by FunctionSplitter or this should be implemented in 
Scala code for code generation?*

It depends. If this issue is only caused by a single operator I'd like to 
change the code generation Scala code in that operator. However if other 
operators may also cause this issue it would be better to fix the 
{{FunctionSplitter}}.

To me this issue seems like a common one. I'd prefer fixing the 
{{FunctionSplitter}} if possible.


was (Author: tsreaper):
Hi [~KristoffSC]!

Thanks for your interest in solving this issue. I'm the author of 
{{JavaCodeSplitter}} and I'd like to offer my thoughts.

*1. if my understanding and proposed high level solution for splitting the 
problematic code block into smaller chunks is correct?*

Your understanding is quite correct. However it might not be that easy to solve 
this issue due to some corner cases. For example we may have {{continue}}'s and 
{{break}}'s in a {{while}} loop, and we also need to deal with the loop 
variable in a {{for}} loop. Due to these cases I choose not to recursively 
split most of the looping code blocks when implementing the first version of 
{{JavaCodeSplitter}}. However for {{if}} code blocks I implement a special 
{{IfStatementRewriter}} to split them because they are quite common in 
generated code and they don't suffer from those corner cases.

I'm not sure if the generated code in this particular ticket contains 
{{continue}}'s or {{break}}'s. If not we may choose to split code blocks 
without these keywords.

*2. should this be done by FunctionSplitter or this should be implemented in 
Scala code for code generation?*

It depends. If this issue is only caused by a single operator I'd like to 
change the code generation Scala code in that operator. However if other 
operators may also cause this issue it would be better to fix the 
{{FunctionSplitter}}.

To me this issue seems like a common one. I'd prefer fixing the 
{{FunctionSplitter}} if possible.

> Code of method 
> "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V"
>  of class "HashAggregateWithKeys$9211" grows beyond 64 KB
> -
>
> Key: FLINK-27246
> URL: https://issues.apache.org/jira/browse/FLINK-27246
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3
>Reporter: Maciej Bryński
>Priority: Major
>
> I think this bug should get fixed in 
> https://issues.apache.org/jira/browse/FLINK-23007
> Unfortunately I spotted it on Flink 1.14.3
> {code}
> java.lang.RuntimeException: Could not instantiate generated class 
> 'HashAggregateWithKeys$9211'
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63)
>  ~[fl

[jira] [Comment Edited] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB

2022-10-30 Thread Caizhi Weng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626347#comment-17626347
 ] 

Caizhi Weng edited comment on FLINK-27246 at 10/31/22 2:17 AM:
---

Hi [~KristoffSC]!

Thanks for your interest in solving this issue. I'm the author of 
{{JavaCodeSplitter}} and I'd like to offer my thoughts.

*1. if my understanding and proposed high level solution for splitting the 
problematic code block into smaller chunks is correct?*

Your understanding is quite correct. However it might not be that easy to solve 
this issue due to some corner cases. For example we may have {{continue}}'s and 
{{break}}'s in a {{while}} loop, and we also need to deal with the loop 
variable in a {{for}} loop. Due to these cases I choose not to recursively 
split most of the looping code blocks when implementing the first version of 
{{JavaCodeSplitter}}. However for {{if}} code blocks I implement a special 
{{IfStatementRewriter}} to split them because they are quite common in 
generated code and they don't suffer from those corner cases.

I'm not sure if the generated code in this particular ticket contains 
{{continue}}'s or {{break}}'s. If not we may choose to split code blocks 
without these keywords.

*2. should this be done by FunctionSplitter or this should be implemented in 
Scala code for code generation?*

It depends. If this issue is only caused by a single operator I'd like to 
change the code generation Scala code in that operator. However if other 
operators may also cause this issue it would be better to fix the 
{{FunctionSplitter}}.

To me this issue seems like a common one. I'd prefer fixing the 
{{FunctionSplitter}} if possible.


was (Author: tsreaper):
Hi [~KristoffSC]!

Thanks for your interest in solving this issue. I'm the author of 
{{JavaCodeSplitter}} and I'd like to offer my thoughts.

*1. if my understanding and proposed high level solution for splitting the 
problematic code block into smaller chunks is correct?*

Your understanding is quite correct. However it might not be that easy to solve 
this issue due to some corner cases. For example we may have {{continue}}'s and 
{{break}}'s in a {{while}} loop, and we also need to deal with the loop 
variable in a {{for}} loop. Due to these cases I choose not to recursively 
split the code block when implementing the first version of 
{{JavaCodeSplitter}}.

I'm not sure if the generated code in this particular ticket contains 
{{continue}}'s or {{break}}'s. If not we may choose to split code blocks 
without these keywords.

*2. should this be done by FunctionSplitter or this should be implemented in 
Scala code for code generation?*

It depends. If this issue is only caused by a single operator I'd like to 
change the code generation Scala code in that operator. However if other 
operators may also cause this issue it would be better to fix the 
{{FunctionSplitter}}.

To me this issue seems like a common one. I'd prefer fixing the 
{{FunctionSplitter}} if possible.

> Code of method 
> "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V"
>  of class "HashAggregateWithKeys$9211" grows beyond 64 KB
> -
>
> Key: FLINK-27246
> URL: https://issues.apache.org/jira/browse/FLINK-27246
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3
>Reporter: Maciej Bryński
>Priority: Major
>
> I think this bug should get fixed in 
> https://issues.apache.org/jira/browse/FLINK-23007
> Unfortunately I spotted it on Flink 1.14.3
> {code}
> java.lang.RuntimeException: Could not instantiate generated class 
> 'HashAggregateWithKeys$9211'
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1

[jira] [Comment Edited] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB

2022-10-30 Thread Caizhi Weng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626347#comment-17626347
 ] 

Caizhi Weng edited comment on FLINK-27246 at 10/31/22 2:15 AM:
---

Hi [~KristoffSC]!

Thanks for your interest in solving this issue. I'm the author of 
{{JavaCodeSplitter}} and I'd like to offer my thoughts.

*1. if my understanding and proposed high level solution for splitting the 
problematic code block into smaller chunks is correct?*

Your understanding is quite correct. However it might not be that easy to solve 
this issue due to some corner cases. For example we may have {{continue}}'s and 
{{break}}'s in a {{while}} loop, and we also need to deal with the loop 
variable in a {{for}} loop. Due to these cases I choose not to recursively 
split the code block when implementing the first version of 
{{JavaCodeSplitter}}.

I'm not sure if the generated code in this particular ticket contains 
{{continue}}'s or {{break}}'s. If not we may choose to split code blocks 
without these keywords.

*2. should this be done by FunctionSplitter or this should be implemented in 
Scala code for code generation?*

It depends. If this issue is only caused by a single operator I'd like to 
change the code generation Scala code in that operator. However if other 
operators may also cause this issue it would be better to fix the 
{{FunctionSplitter}}.

To me this issue seems like a common one. I'd prefer fixing the 
{{FunctionSplitter}} if possible.


was (Author: tsreaper):
Hi [~KristoffSC]!

Thanks for your interest in solving this issue. I'm the author of 
{{JavaCodeSplitter}} and I'd like to offer my thoughts.

*1. if my understanding and proposed high level solution for splitting the 
problematic code block into smaller chunks is correct?*

Your understanding is quite correct. However it might not be that easy to solve 
this issue due to some corner cases. For example we may have {{continue}}'s and 
{{break}}'s in a {{while}} loop, and we also need to deal with the loop 
variable in a {{for}} loop. Due to these cases I choose not to recursively 
split the code block when implementing the first version of 
{{JavaCodeSplitter}}.

I'm not sure if the generated code in this particular ticket contains 
{{continue}}'s or {{break}}'s. If not we may choose to split code blocks 
without these keywords.

*2. should this be done by FunctionSplitter or this should be implemented in 
Scala code for code generation?*

It depends. If this issue is only caused by a single operator I'd like to 
change the code generation Scala code in that operator (if possible). However 
if other operators may also cause this issue it would be better to fix the 
{{FunctionSplitter}}.

> Code of method 
> "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V"
>  of class "HashAggregateWithKeys$9211" grows beyond 64 KB
> -
>
> Key: FLINK-27246
> URL: https://issues.apache.org/jira/browse/FLINK-27246
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3
>Reporter: Maciej Bryński
>Priority: Major
>
> I think this bug should get fixed in 
> https://issues.apache.org/jira/browse/FLINK-23007
> Unfortunately I spotted it on Flink 1.14.3
> {code}
> java.lang.RuntimeException: Could not instantiate generated class 
> 'HashAggregateWithKeys$9211'
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.12-1.14.3-stream1.jar

[jira] [Comment Edited] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB

2022-10-30 Thread Caizhi Weng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626347#comment-17626347
 ] 

Caizhi Weng edited comment on FLINK-27246 at 10/31/22 2:14 AM:
---

Hi [~KristoffSC]!

Thanks for your interest in solving this issue. I'm the author of 
{{JavaCodeSplitter}} and I'd like to offer my thoughts.

*1. if my understanding and proposed high level solution for splitting the 
problematic code block into smaller chunks is correct?*

Your understanding is quite correct. However it might not be that easy to solve 
this issue due to some corner cases. For example we may have {{continue}}'s and 
{{break}}'s in a {{while}} loop, and we also need to deal with the loop 
variable in a {{for}} loop. Due to these cases I choose not to recursively 
split the code block when implementing the first version of 
{{JavaCodeSplitter}}.

I'm not sure if the generated code in this particular ticket contains 
{{continue}}'s or {{break}}'s. If not we may choose to split code blocks 
without these keywords.

*2. should this be done by FunctionSplitter or this should be implemented in 
Scala code for code generation?*

It depends. If this issue is only caused by a single operator I'd like to 
change the code generation Scala code in that operator (if possible). However 
if other operators may also cause this issue it would be better to fix the 
{{FunctionSplitter}}.


was (Author: tsreaper):
Hi [~KristoffSC]!

Thanks for your interest in solving this issue. I'm the author of 
{{JavaCodeSplitter}} and I'd like to offer my thoughts.

*1. if my understanding and proposed high level solution for splitting the 
problematic code block into smaller chunks is correct?*

Your understanding is quite correct. However it might not be that easy to solve 
this issue due to some corner cases. For example we may have {{continue}}s and 
{{break}}s in a {{while}} loop, and we also need to deal with the loop variable 
in a {{for}} loop. Due to these cases I choose not to recursively split the 
code block when implementing the first version of {{JavaCodeSplitter}}.

I'm not sure if the generated code in this particular ticket contains 
{{continue}}s or {{break}}s. If not we may choose to split code blocks without 
these keywords.

*2. should this be done by FunctionSplitter or this should be implemented in 
Scala code for code generation?*

It depends. If this issue is only caused by a single operator I'd like to 
change the code generation Scala code in that operator (if possible). However 
if other operators may also cause this issue it would be better to fix the 
{{FunctionSplitter}}.

> Code of method 
> "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V"
>  of class "HashAggregateWithKeys$9211" grows beyond 64 KB
> -
>
> Key: FLINK-27246
> URL: https://issues.apache.org/jira/browse/FLINK-27246
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3
>Reporter: Maciej Bryński
>Priority: Major
>
> I think this bug should get fixed in 
> https://issues.apache.org/jira/browse/FLINK-23007
> Unfortunately I spotted it on Flink 1.14.3
> {code}
> java.lang.RuntimeException: Could not instantiate generated class 
> 'HashAggregateWithKeys$9211'
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(T

[jira] [Commented] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB

2022-10-30 Thread Caizhi Weng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626347#comment-17626347
 ] 

Caizhi Weng commented on FLINK-27246:
-

Hi [~KristoffSC]!

Thanks for your interest in solving this issue. I'm the author of 
{{JavaCodeSplitter}} and I'd like to offer some insight about this.

*1. if my understanding and proposed high level solution for splitting the 
problematic code block into smaller chunks is correct?*

Your understanding is quite correct. However it might not be that easy to solve 
this issue due to some corner cases. For example we may have {{continue}}s and 
{{break}}s in a {{while}} loop, and we also need to deal with the loop variable 
in a {{for}} loop. Due to these cases I choose not to recursively split the 
code block when implementing the first version of {{JavaCodeSplitter}}.

I'm not sure if the generated code in this particular ticket contains 
{{continue}}s or {{break}}s. If not we may choose to split code blocks without 
these keywords.

*2. should this be done by FunctionSplitter or this should be implemented in 
Scala code for code generation?*

It depends. If this issue is only caused by a single operator I'd like to 
change the code generation Scala code in that operator (if possible). However 
if other operators may also cause this issue it would be better to fix the 
{{FunctionSplitter}}.

> Code of method 
> "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V"
>  of class "HashAggregateWithKeys$9211" grows beyond 64 KB
> -
>
> Key: FLINK-27246
> URL: https://issues.apache.org/jira/browse/FLINK-27246
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3
>Reporter: Maciej Bryński
>Priority: Major
>
> I think this bug should get fixed in 
> https://issues.apache.org/jira/browse/FLINK-23007
> Unfortunately I spotted it on Flink 1.14.3
> {code}
> java.lang.RuntimeException: Could not instantiate generated class 
> 'HashAggregateWithKeys$9211'
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:102)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:83)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   ... 11 more
> Caused by: 
> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compile

[jira] [Comment Edited] (FLINK-27246) Code of method "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V" of class "HashAggregateWithKeys$9211" grows beyond 64 KB

2022-10-30 Thread Caizhi Weng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626347#comment-17626347
 ] 

Caizhi Weng edited comment on FLINK-27246 at 10/31/22 2:13 AM:
---

Hi [~KristoffSC]!

Thanks for your interest in solving this issue. I'm the author of 
{{JavaCodeSplitter}} and I'd like to offer my thoughts.

*1. if my understanding and proposed high level solution for splitting the 
problematic code block into smaller chunks is correct?*

Your understanding is quite correct. However it might not be that easy to solve 
this issue due to some corner cases. For example we may have {{continue}}s and 
{{break}}s in a {{while}} loop, and we also need to deal with the loop variable 
in a {{for}} loop. Due to these cases I choose not to recursively split the 
code block when implementing the first version of {{JavaCodeSplitter}}.

I'm not sure if the generated code in this particular ticket contains 
{{continue}}s or {{break}}s. If not we may choose to split code blocks without 
these keywords.

*2. should this be done by FunctionSplitter or this should be implemented in 
Scala code for code generation?*

It depends. If this issue is only caused by a single operator I'd like to 
change the code generation Scala code in that operator (if possible). However 
if other operators may also cause this issue it would be better to fix the 
{{FunctionSplitter}}.


was (Author: tsreaper):
Hi [~KristoffSC]!

Thanks for your interest in solving this issue. I'm the author of 
{{JavaCodeSplitter}} and I'd like to offer some insight about this.

*1. if my understanding and proposed high level solution for splitting the 
problematic code block into smaller chunks is correct?*

Your understanding is quite correct. However it might not be that easy to solve 
this issue due to some corner cases. For example we may have {{continue}}s and 
{{break}}s in a {{while}} loop, and we also need to deal with the loop variable 
in a {{for}} loop. Due to these cases I choose not to recursively split the 
code block when implementing the first version of {{JavaCodeSplitter}}.

I'm not sure if the generated code in this particular ticket contains 
{{continue}}s or {{break}}s. If not we may choose to split code blocks without 
these keywords.

*2. should this be done by FunctionSplitter or this should be implemented in 
Scala code for code generation?*

It depends. If this issue is only caused by a single operator I'd like to 
change the code generation Scala code in that operator (if possible). However 
if other operators may also cause this issue it would be better to fix the 
{{FunctionSplitter}}.

> Code of method 
> "processElement(Lorg/apache/flink/streaming/runtime/streamrecord/StreamRecord;)V"
>  of class "HashAggregateWithKeys$9211" grows beyond 64 KB
> -
>
> Key: FLINK-27246
> URL: https://issues.apache.org/jira/browse/FLINK-27246
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3
>Reporter: Maciej Bryński
>Priority: Major
>
> I think this bug should get fixed in 
> https://issues.apache.org/jira/browse/FLINK-23007
> Unfortunately I spotted it on Flink 1.14.3
> {code}
> java.lang.RuntimeException: Could not instantiate generated class 
> 'HashAggregateWithKeys$9211'
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:85)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
>  ~[flink-table_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:198)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:63)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:666)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.12-1.14.3-stream1.jar:1.14.3-stream1]
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndI

[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

2022-10-30 Thread GitBox


jiangxin369 commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1008977825


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/Imputer.java:
##
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.util.QuantileSummary;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The Imputer estimator completes missing values in a dataset. Missing values 
can be imputed using

Review Comment:
   How about `The Imputer estimator completes missing values of the input 
columns. `, cause that the model can also be transformed on an unbounded data 
stream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29748) Expose the optimize phase in the connector context

2022-10-30 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626343#comment-17626343
 ] 

Shengkai Fang commented on FLINK-29748:
---

Yes. I think you are right. 

But I am confused about 

> If I want to validate the source do not consume too much partition, from the 
> connector's perspective, It do not know whether the optimization is finished, 
> so it do not know when to apply the validation on the final optimization 
> results.

The rule will only apply once during the optimization because the rule uses hep 
optimizer which is not based on the cost.


> Expose the optimize phase in the connector context
> --
>
> Key: FLINK-29748
> URL: https://issues.apache.org/jira/browse/FLINK-29748
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Aitozi
>Priority: Minor
>
> Currently, in the connector it can not know whether the whole optimize is 
> finished.
> When the optimize finished, the all information is static, eg: the reading 
> partitions. If I want to validate the final optimized result (like whether 
> the reading partition is too much or empty), it needs the context of what is 
> the current phase. I think the {{ScanContext}} is ok to expose this 
> information. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29572) Flink Task Manager skip loopback interface for resource manager registration

2022-10-30 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626342#comment-17626342
 ] 

Xintong Song commented on FLINK-29572:
--

I'm still not convinced that this is a bug of Flink. Maybe we can live with the 
disagreement for now and focus solving your problem first.

bq. You don't have to configure different ports for each task manager. You just 
need to remove `taskmanager.rpc.port` from your configuration, and Flink by 
default should use random ports.
Have you tried the random port approach?

One more question: when not binding to the loopback address, does all the 
traffics still go through the proxy?

> Flink Task Manager skip loopback interface for resource manager registration
> 
>
> Key: FLINK-29572
> URL: https://issues.apache.org/jira/browse/FLINK-29572
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.15.2
> Environment: Flink 1.15.2
> Kubernetes with Istio Proxy
>Reporter: Kevin Li
>Priority: Major
>
> Currently Flink Task Manager use different local interface to bind to connect 
> to Resource Manager. First one is Loopback interface. Normally if Job Manager 
> is running on remote host/container, using loopback interface to connect will 
> fail and it will pick up correct IP address.
> However, if Task Manager is running with some proxy, loopback interface can 
> connect to remote host as well. This will result 127.0.0.1 reported to 
> Resource Manager during registration, even Job Manager/Resource Manager runs 
> on remote host, and problem will happen. For us, only one Task Manager can 
> register in this case.
> I suggest adding configuration to skip Loopback interface check if we know 
> Job/Resource Manager is running on remote host/container.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29792) FileStoreCommitTest is unstable and may stuck

2022-10-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29792:
---
Labels: pull-request-available  (was: )

> FileStoreCommitTest is unstable and may stuck
> -
>
> Key: FLINK-29792
> URL: https://issues.apache.org/jira/browse/FLINK-29792
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.3.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>  Labels: pull-request-available
>
> {{FileStoreCommitTest}} may stuck because the {{FileStoreCommit}} in 
> {{TestCommitThread}} does not commit APPEND snapshot when no new files are 
> produced. In this case, if the following COMPACT snapshot conflicts with the 
> current merge tree, the test will stuck.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] tsreaper opened a new pull request, #341: [FLINK-29792] Fix unstable test FileStoreCommitTest which may stuck

2022-10-30 Thread GitBox


tsreaper opened a new pull request, #341:
URL: https://github.com/apache/flink-table-store/pull/341

   `FileStoreCommitTest` may stuck because the `FileStoreCommit` in 
`TestCommitThread` does not commit APPEND snapshot when no new files are 
produced. In this case, if the following COMPACT snapshot conflicts with the 
current merge tree, the test will stuck.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-29792) FileStoreCommitTest is unstable and may stuck

2022-10-30 Thread Caizhi Weng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng reassigned FLINK-29792:
---

Assignee: Caizhi Weng

> FileStoreCommitTest is unstable and may stuck
> -
>
> Key: FLINK-29792
> URL: https://issues.apache.org/jira/browse/FLINK-29792
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.3.0
>Reporter: Caizhi Weng
>Assignee: Caizhi Weng
>Priority: Major
>
> {{FileStoreCommitTest}} may stuck because the {{FileStoreCommit}} in 
> {{TestCommitThread}} does not commit APPEND snapshot when no new files are 
> produced. In this case, if the following COMPACT snapshot conflicts with the 
> current merge tree, the test will stuck.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29728) TablePlanner prevents Flink from starting is working directory is a symbolic link

2022-10-30 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-29728.

Fix Version/s: 1.17.0
   1.16.1
   Resolution: Fixed

- master (1.17): e8e9db37e17110ff04175d2720484b34f5c4d5ba
- release-1.16: 8fd9aa63a30a6037fcad752ab74fbdd6649ca3f0

> TablePlanner prevents Flink from starting is working directory is a symbolic 
> link
> -
>
> Key: FLINK-29728
> URL: https://issues.apache.org/jira/browse/FLINK-29728
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Angelo Kastroulis
>Assignee: Weijie Guo
>Priority: Major
> Fix For: 1.17.0, 1.16.1
>
>
> The Flink runtime throws an exception when using the table API if the working 
> directory is a symbolic link. This is the case when run on AWS EMR with Yarn. 
> There is a similar issue 
> [here|https://issues.apache.org/jira/browse/FLINK-20267] and I believe the 
> same fix applied there would work.
>  
>  
> {code:java}
> Caused by: org.apache.flink.table.api.TableException: Could not initialize 
> the table planner components loader.
>     at 
> org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:123)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:52)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.planner.loader.PlannerModule$PlannerComponentsHolder.(PlannerModule.java:131)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.planner.loader.PlannerModule.getInstance(PlannerModule.java:135)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.planner.loader.DelegateExecutorFactory.(DelegateExecutorFactory.java:34)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
> ~[?:1.8.0_342]
>     at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  ~[?:1.8.0_342]
>     at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_342]
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
> ~[?:1.8.0_342]
>     at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_342]
>     at 
> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) 
> ~[?:1.8.0_342]
>     at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) 
> ~[?:1.8.0_342]
>     at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ~[?:1.8.0_342]
>     at 
> org.apache.flink.table.factories.ServiceLoaderUtil.load(ServiceLoaderUtil.java:42)
>  ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:798)
>  ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:517)
>  ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:276)
>  ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:93) 
> ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at com.ballista.Hermes.BCSE$.useLocalCatalog(BCSE.scala:210) ~[?:?]
>     at com.ballista.Hermes.BCSE$.main(BCSE.scala:114) ~[?:?]
>     at com.ballista.Hermes.BCSE.main(BCSE.scala) ~[?:?]
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:1.8.0_342]
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_342]
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_342]
>     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     ... 7 more
> Caused by: java.nio.file.FileAlreadyExistsException: /tmp
>     at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) 
> ~[?:1.8.0_342]
>     at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) 
> ~[?:1.8.0_342]
>     at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) 
> ~[?:1.8.0_342]
>     at 
> sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
>  ~[?:1.8.0_342]
>     at java.nio.file.Files.createDirectory(Files.java:674) ~[?:1.8.0_342]
>     at java.nio.file.Files.createAndCheckIsDirectory(Files.java

[jira] [Updated] (FLINK-29728) TablePlanner prevents Flink from starting is working directory is a symbolic link

2022-10-30 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-29728:
-
Component/s: Table SQL / Planner
 (was: Runtime / Coordination)

> TablePlanner prevents Flink from starting is working directory is a symbolic 
> link
> -
>
> Key: FLINK-29728
> URL: https://issues.apache.org/jira/browse/FLINK-29728
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Angelo Kastroulis
>Assignee: Weijie Guo
>Priority: Major
>
> The Flink runtime throws an exception when using the table API if the working 
> directory is a symbolic link. This is the case when run on AWS EMR with Yarn. 
> There is a similar issue 
> [here|https://issues.apache.org/jira/browse/FLINK-20267] and I believe the 
> same fix applied there would work.
>  
>  
> {code:java}
> Caused by: org.apache.flink.table.api.TableException: Could not initialize 
> the table planner components loader.
>     at 
> org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:123)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:52)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.planner.loader.PlannerModule$PlannerComponentsHolder.(PlannerModule.java:131)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.planner.loader.PlannerModule.getInstance(PlannerModule.java:135)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.planner.loader.DelegateExecutorFactory.(DelegateExecutorFactory.java:34)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
> ~[?:1.8.0_342]
>     at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  ~[?:1.8.0_342]
>     at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_342]
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
> ~[?:1.8.0_342]
>     at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_342]
>     at 
> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) 
> ~[?:1.8.0_342]
>     at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) 
> ~[?:1.8.0_342]
>     at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ~[?:1.8.0_342]
>     at 
> org.apache.flink.table.factories.ServiceLoaderUtil.load(ServiceLoaderUtil.java:42)
>  ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:798)
>  ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:517)
>  ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:276)
>  ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:93) 
> ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at com.ballista.Hermes.BCSE$.useLocalCatalog(BCSE.scala:210) ~[?:?]
>     at com.ballista.Hermes.BCSE$.main(BCSE.scala:114) ~[?:?]
>     at com.ballista.Hermes.BCSE.main(BCSE.scala) ~[?:?]
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:1.8.0_342]
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_342]
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_342]
>     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     ... 7 more
> Caused by: java.nio.file.FileAlreadyExistsException: /tmp
>     at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) 
> ~[?:1.8.0_342]
>     at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) 
> ~[?:1.8.0_342]
>     at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) 
> ~[?:1.8.0_342]
>     at 
> sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
>  ~[?:1.8.0_342]
>     at java.nio.file.Files.createDirectory(Files.java:674) ~[?:1.8.0_342]
>     at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) 
> ~[?:1.8.0_342]
>     at java.nio.file.Files.createDirectories(Files.java:727) ~[?:1.8.0_342]
>     at 
> org.apache.flink.table.planne

[jira] [Assigned] (FLINK-29728) TablePlanner prevents Flink from starting is working directory is a symbolic link

2022-10-30 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reassigned FLINK-29728:


Assignee: Weijie Guo

> TablePlanner prevents Flink from starting is working directory is a symbolic 
> link
> -
>
> Key: FLINK-29728
> URL: https://issues.apache.org/jira/browse/FLINK-29728
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.15.2
>Reporter: Angelo Kastroulis
>Assignee: Weijie Guo
>Priority: Major
>
> The Flink runtime throws an exception when using the table API if the working 
> directory is a symbolic link. This is the case when run on AWS EMR with Yarn. 
> There is a similar issue 
> [here|https://issues.apache.org/jira/browse/FLINK-20267] and I believe the 
> same fix applied there would work.
>  
>  
> {code:java}
> Caused by: org.apache.flink.table.api.TableException: Could not initialize 
> the table planner components loader.
>     at 
> org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:123)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:52)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.planner.loader.PlannerModule$PlannerComponentsHolder.(PlannerModule.java:131)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.planner.loader.PlannerModule.getInstance(PlannerModule.java:135)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.planner.loader.DelegateExecutorFactory.(DelegateExecutorFactory.java:34)
>  ~[flink-table-planner-loader-1.15.1.jar:1.15.1]
>     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
> ~[?:1.8.0_342]
>     at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  ~[?:1.8.0_342]
>     at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_342]
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
> ~[?:1.8.0_342]
>     at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_342]
>     at 
> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) 
> ~[?:1.8.0_342]
>     at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) 
> ~[?:1.8.0_342]
>     at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ~[?:1.8.0_342]
>     at 
> org.apache.flink.table.factories.ServiceLoaderUtil.load(ServiceLoaderUtil.java:42)
>  ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:798)
>  ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:517)
>  ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:276)
>  ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:93) 
> ~[flink-table-api-java-uber-1.15.1.jar:1.15.1]
>     at com.ballista.Hermes.BCSE$.useLocalCatalog(BCSE.scala:210) ~[?:?]
>     at com.ballista.Hermes.BCSE$.main(BCSE.scala:114) ~[?:?]
>     at com.ballista.Hermes.BCSE.main(BCSE.scala) ~[?:?]
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:1.8.0_342]
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> ~[?:1.8.0_342]
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:1.8.0_342]
>     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
>     at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     ... 7 more
> Caused by: java.nio.file.FileAlreadyExistsException: /tmp
>     at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) 
> ~[?:1.8.0_342]
>     at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) 
> ~[?:1.8.0_342]
>     at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) 
> ~[?:1.8.0_342]
>     at 
> sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
>  ~[?:1.8.0_342]
>     at java.nio.file.Files.createDirectory(Files.java:674) ~[?:1.8.0_342]
>     at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) 
> ~[?:1.8.0_342]
>     at java.nio.file.Files.createDirectories(Files.java:727) ~[?:1.8.0_342]
>     at 
> org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:96)
>  ~[f

[jira] [Closed] (FLINK-28102) Flink AkkaRpcSystemLoader fails when temporary directory is a symlink

2022-10-30 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-28102.

Fix Version/s: 1.17.0
   1.16.1
   Resolution: Fixed

- master (1.17): 2859196f9ab1d86a3d90e47a89cbd13be74741b9
- release-1.16: 3ae578e2233abd42f770d1bf395792c85698fd89

> Flink AkkaRpcSystemLoader fails when temporary directory is a symlink
> -
>
> Key: FLINK-28102
> URL: https://issues.apache.org/jira/browse/FLINK-28102
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / RPC
>Affects Versions: 1.16.0, 1.15.2
>Reporter: Prabhu Joseph
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.16.1
>
>
> Flink AkkaRpcSystemLoader fails when temporary directory is a symlink
> *Error Message:*
> {code}
> Caused by: java.nio.file.FileAlreadyExistsException: /tmp
> at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) 
> ~[?:1.8.0_332]
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) 
> ~[?:1.8.0_332]
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) 
> ~[?:1.8.0_332]
> at 
> sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
>  ~[?:1.8.0_332]
> at java.nio.file.Files.createDirectory(Files.java:674) ~[?:1.8.0_332]
> at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) 
> ~[?:1.8.0_332]
> at java.nio.file.Files.createDirectories(Files.java:727) 
> ~[?:1.8.0_332]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcSystemLoader.loadRpcSystem(AkkaRpcSystemLoader.java:58)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> at org.apache.flink.runtime.rpc.RpcSystem.load(RpcSystem.java:101) 
> ~[flink-dist-1.15.0.jar:1.15.0]
> at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:186)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:288)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:481)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> {code}
> *Repro:*
> {code}
> 1. /tmp is a symlink points to actual directory /mnt/tmp
> [root@prabhuHost log]# ls -lrt /tmp
> lrwxrwxrwx 1 root root 8 Jun 15 07:51 /tmp -> /mnt/tmp
> 2. Start Cluster
> ./bin/start-cluster.sh
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28102) Flink AkkaRpcSystemLoader fails when temporary directory is a symlink

2022-10-30 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-28102:
-
Affects Version/s: 1.15.2
   (was: 1.15.0)
   (was: 1.17.0)

> Flink AkkaRpcSystemLoader fails when temporary directory is a symlink
> -
>
> Key: FLINK-28102
> URL: https://issues.apache.org/jira/browse/FLINK-28102
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / RPC
>Affects Versions: 1.16.0, 1.15.2
>Reporter: Prabhu Joseph
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available
>
> Flink AkkaRpcSystemLoader fails when temporary directory is a symlink
> *Error Message:*
> {code}
> Caused by: java.nio.file.FileAlreadyExistsException: /tmp
> at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) 
> ~[?:1.8.0_332]
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) 
> ~[?:1.8.0_332]
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) 
> ~[?:1.8.0_332]
> at 
> sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
>  ~[?:1.8.0_332]
> at java.nio.file.Files.createDirectory(Files.java:674) ~[?:1.8.0_332]
> at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) 
> ~[?:1.8.0_332]
> at java.nio.file.Files.createDirectories(Files.java:727) 
> ~[?:1.8.0_332]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcSystemLoader.loadRpcSystem(AkkaRpcSystemLoader.java:58)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> at org.apache.flink.runtime.rpc.RpcSystem.load(RpcSystem.java:101) 
> ~[flink-dist-1.15.0.jar:1.15.0]
> at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:186)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:288)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> at 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:481)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> {code}
> *Repro:*
> {code}
> 1. /tmp is a symlink points to actual directory /mnt/tmp
> [root@prabhuHost log]# ls -lrt /tmp
> lrwxrwxrwx 1 root root 8 Jun 15 07:51 /tmp -> /mnt/tmp
> 2. Start Cluster
> ./bin/start-cluster.sh
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong closed pull request #21125: [FLINK-28102] Flink AkkaRpcSystemLoader fails when temporary directory is a symlink

2022-10-30 Thread GitBox


xintongsong closed pull request #21125: [FLINK-28102] Flink AkkaRpcSystemLoader 
fails when temporary directory is a symlink
URL: https://github.com/apache/flink/pull/21125


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on a diff in pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quantiles

2022-10-30 Thread GitBox


lindong28 commented on code in PR #162:
URL: https://github.com/apache/flink-ml/pull/162#discussion_r1008964931


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/QuantileSummary.java:
##
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Helper class to compute an approximate quantile summary. This 
implementation is based on the
+ * algorithm proposed in the paper: "Space-efficient Online Computation of 
Quantile Summaries" by
+ * Greenwald, Michael and Khanna, Sanjeev. 
(https://doi.org/10.1145/375663.375670)
+ */
+@Internal

Review Comment:
   @jiangxin369 If you need to update this PR, it might be simpler to also 
update `withBroadcast()` to follow the best practice of using `@Internal`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] lindong28 commented on a diff in pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quantiles

2022-10-30 Thread GitBox


lindong28 commented on code in PR #162:
URL: https://github.com/apache/flink-ml/pull/162#discussion_r1008964672


##
flink-ml-lib/src/main/java/org/apache/flink/ml/common/util/QuantileSummary.java:
##
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Helper class to compute an approximate quantile summary. This 
implementation is based on the
+ * algorithm proposed in the paper: "Space-efficient Online Computation of 
Quantile Summaries" by
+ * Greenwald, Michael and Khanna, Sanjeev. 
(https://doi.org/10.1145/375663.375670)
+ */
+@Internal
+public class QuantileSummary implements Serializable {
+
+/** The target relative error. */
+private final double relativeError;
+
+/**
+ * The compression threshold. After the internal buffer of statistics 
crosses this size, it
+ * attempts to compress the statistics together.
+ */
+private final int compressThreshold;
+
+/** The count of all the elements inserted to be calculated. */
+private final long count;
+
+/** A buffer of quantile statistics. */
+private final List sampled;
+
+/** The default size of head buffer. */
+private static final int DEFAULT_HEAD_SIZE = 5;
+
+/** The default compression threshold. */
+private static final int DEFAULT_COMPRESS_THRESHOLD = 1;
+
+/** A buffer of the latest samples seen so far. */
+private List headBuffer = new ArrayList<>(DEFAULT_HEAD_SIZE);
+
+/**
+ * QuantileSummary Constructor.
+ *
+ * @param relativeError The target relative error.
+ */
+public QuantileSummary(double relativeError) {
+this(relativeError, DEFAULT_COMPRESS_THRESHOLD);
+}
+
+/**
+ * QuantileSummary Constructor.
+ *
+ * @param relativeError The target relative error.
+ * @param compressThreshold the compression threshold. After the internal 
buffer of statistics
+ * crosses this size, it attempts to compress the statistics together.
+ */
+public QuantileSummary(double relativeError, int compressThreshold) {
+this(relativeError, compressThreshold, Collections.EMPTY_LIST, 0);
+}
+
+/**
+ * QuantileSummary Constructor.
+ *
+ * @param relativeError The target relative error.
+ * @param compressThreshold the compression threshold.
+ * @param sampled A buffer of quantile statistics. See the G-K article for 
more details.
+ * @param count The count of all the elements inserted in the sampled 
buffer.
+ */
+private QuantileSummary(
+double relativeError, int compressThreshold, List 
sampled, long count) {
+Preconditions.checkArgument(
+relativeError > 0 && relativeError < 1,
+"An appropriate relative error must lay between 0 and 1.");
+Preconditions.checkArgument(
+compressThreshold > 0, "An compress threshold must greater 
than 0.");
+this.relativeError = relativeError;
+this.compressThreshold = compressThreshold;
+this.sampled = sampled;
+this.count = count;
+}
+
+/**
+ * Insert a new observation to the summary.
+ *
+ * @param item The new observation to insert into the summary.
+ * @return A summary with the given observation inserted into the summary.
+ */
+public QuantileSummary insert(Double item) {
+headBuffer.add(item);
+if (headBuffer.size() >= DEFAULT_HEAD_SIZE) {
+QuantileSummary result = insertHeadBuffer();
+if (result.sampled.size() >= compressThreshold) {
+return result.compress();
+} else {
+return result;
+}
+} else {
+return this;
+}
+}
+
+/**
+ * Returns 

[GitHub] [flink] xintongsong commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

2022-10-30 Thread GitBox


xintongsong commented on PR #21122:
URL: https://github.com/apache/flink/pull/21122#issuecomment-1296419534

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #17649: [FLINK-24742][table][docs] Add info about SQL client key strokes to docs

2022-10-30 Thread GitBox


snuyanzin commented on code in PR #17649:
URL: https://github.com/apache/flink/pull/17649#discussion_r1008952683


##
docs/content/docs/dev/table/sqlClient.md:
##
@@ -91,6 +91,40 @@ The `SET` command allows you to tune the job execution and 
the sql client behavi
 After a query is defined, it can be submitted to the cluster as a 
long-running, detached Flink job.
 The [configuration section](#configuration) explains how to declare table 
sources for reading data, how to declare table sinks for writing data, and how 
to configure other table program properties.
 
+### Key-strokes
+
+There is a list of available key-strokes in SQL client
+
+| Key-Stroke (Linux, Windows(WSL)) | Key-Stroke (Mac) | Description
|
+|:-|--|:---|
+| `alt-b`  | `Esc-b`  | Backward word  
|
+| `alt-f`  | `Esc-f`  | Forward word   
|
+| `alt-c`  | `Esc-c`  | Capitalize word
|
+| `alt-l`  | `Esc-l`  | Lowercase word 
|
+| `alt-u`  | `Esc-u`  | Uppercase word 
|
+| `alt-d`  | `Esc-d`  | Kill word  
|
+| `alt-n`  | `Esc-n`  | History search forward 
|
+| `alt-p`  | `Esc-p`  | History search 
backward|

Review Comment:
   i added this clarification



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #17649: [FLINK-24742][table][docs] Add info about SQL client key strokes to docs

2022-10-30 Thread GitBox


snuyanzin commented on code in PR #17649:
URL: https://github.com/apache/flink/pull/17649#discussion_r1008952635


##
docs/content/docs/dev/table/sqlClient.md:
##
@@ -91,6 +91,40 @@ The `SET` command allows you to tune the job execution and 
the sql client behavi
 After a query is defined, it can be submitted to the cluster as a 
long-running, detached Flink job.
 The [configuration section](#configuration) explains how to declare table 
sources for reading data, how to declare table sinks for writing data, and how 
to configure other table program properties.
 
+### Key-strokes
+
+There is a list of available key-strokes in SQL client
+
+| Key-Stroke (Linux, Windows(WSL)) | Key-Stroke (Mac) | Description
|
+|:-|--|:---|
+| `alt-b`  | `Esc-b`  | Backward word  
|
+| `alt-f`  | `Esc-f`  | Forward word   
|

Review Comment:
   thanks, added this as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #17649: [FLINK-24742][table][docs] Add info about SQL client key strokes to docs

2022-10-30 Thread GitBox


snuyanzin commented on code in PR #17649:
URL: https://github.com/apache/flink/pull/17649#discussion_r1008951197


##
docs/content/docs/dev/table/sqlClient.md:
##
@@ -91,6 +91,40 @@ The `SET` command allows you to tune the job execution and 
the sql client behavi
 After a query is defined, it can be submitted to the cluster as a 
long-running, detached Flink job.
 The [configuration section](#configuration) explains how to declare table 
sources for reading data, how to declare table sinks for writing data, and how 
to configure other table program properties.
 
+### Key-strokes
+
+There is a list of available key-strokes in SQL client
+
+| Key-Stroke (Linux, Windows(WSL)) | Key-Stroke (Mac) | Description
|
+|:-|--|:---|
+| `alt-b`  | `Esc-b`  | Backward word  
|
+| `alt-f`  | `Esc-f`  | Forward word   
|
+| `alt-c`  | `Esc-c`  | Capitalize word
|
+| `alt-l`  | `Esc-l`  | Lowercase word 
|
+| `alt-u`  | `Esc-u`  | Uppercase word 
|
+| `alt-d`  | `Esc-d`  | Kill word  
|
+| `alt-n`  | `Esc-n`  | History search forward 
|
+| `alt-p`  | `Esc-p`  | History search 
backward|

Review Comment:
   it's not 100% same.
   `history search forward/backward` behaves same as `Up/line from history` in 
case there is no input provided.
   However if there is a non-empty input it will navigate to the first 
occurrence `forward/backward` containing input.
   That's the difference 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

2022-10-30 Thread GitBox


qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1296359376

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] salvalcantara commented on pull request #21186: [FLINK-29480][Connector/Kafka] Skip null records when writing

2022-10-30 Thread GitBox


salvalcantara commented on PR #21186:
URL: https://github.com/apache/flink/pull/21186#issuecomment-1296333613

   Initially, the ci pipeline passed successfully, I made some cosmetic changes 
and then I got a failure which seems unrelated to my code changes, really. Can 
you confirm @MartijnVisser @mas-chen @leonardBang @PatrickRen?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

2022-10-30 Thread GitBox


qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1008835739


##
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return 
Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+extends ExpressionDefaultVisitor> {
+
+private Function quoteIdentifierFunction;
+
+private static final Set> SUPPORTED_DATA_TYPES;
+
+static {
+SUPPORTED_DATA_TYPES = new HashSet<>();
+SUPPORTED_DATA_TYPES.add(IntType.class);
+SUPPORTED_DATA_TYPES.add(BigIntType.class);
+SUPPORTED_DATA_TYPES.add(BooleanType.class);
+SUPPORTED_DATA_TYPES.add(DecimalType.class);
+SUPPORTED_DATA_TYPES.add(DoubleType.class);
+SUPPORTED_DATA_TYPES.add(FloatType.class);
+SUPPORTED_DATA_TYPES.add(SmallIntType.class);
+SUPPORTED_DATA_TYPES.add(VarCharType.class);
+SUPPORTED_DATA_TYPES.add(TimestampType.class);
+SUPPORTED_DATA_TYPES.add(DateType.class);
+SUPPORTED_DATA_TYPES.add(TimeType.class);
+}
+
+public JdbcFilterPushdownPreparedStatementVisitor(
+Function quoteIdentifierFunction) {
+this.quoteIdentifierFunction = quoteIdentifierFunction;
+}
+
+@Override
+public Optional visit(CallExpression call) {
+if 
(BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("<", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
 {
+return renderBinaryOperator("<=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator(">", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
 {
+return renderBinaryOperator(">=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("<>", call.getResolvedCh

[GitHub] [flink] salvalcantara commented on pull request #21186: [FLINK-29480][Connectors / Kafka] Skip null records when writing

2022-10-30 Thread GitBox


salvalcantara commented on PR #21186:
URL: https://github.com/apache/flink/pull/21186#issuecomment-1296303288

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29799) How to override ConfigMap values while deploying the Operator via OLM?

2022-10-30 Thread Cansu Kavili (Jira)
Cansu Kavili created FLINK-29799:


 Summary: How to override ConfigMap values while deploying the 
Operator via OLM?
 Key: FLINK-29799
 URL: https://issues.apache.org/jira/browse/FLINK-29799
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Cansu Kavili


Hi,

Flink Kubernetes Operator is great - thank you! I deployed it via OLM on 
OpenShift 4.10 and would like to override some config from 
`flink-operator-config` CM. When I override a value manually, it persists 
(which is strange - it's a resource managed via Operator) so when I change a 
value in CM and restart Operator, it works. How can I give the parameters I 
want while installing the operator? Some operators support setting environment 
variables in `Subscription` object but I couldn't find such a thing in the 
documentation. I can do it easily with Helm installation but I want to use OLM. 
It'd be appreciated if you can guide me. Many many thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21195: [IN_PROGRESS] Remove "flink-sql-parser-hive" from "flink-table"

2022-10-30 Thread GitBox


flinkbot commented on PR #21195:
URL: https://github.com/apache/flink/pull/21195#issuecomment-1296283103

   
   ## CI report:
   
   * bbfcc159bebcec8ad634c48ee8f39e975640696a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ferenc-csaky opened a new pull request, #21195: [IN_PROGRESS] Remove "flink-sql-parser-hive" from "flink-table"

2022-10-30 Thread GitBox


ferenc-csaky opened a new pull request, #21195:
URL: https://github.com/apache/flink/pull/21195

   ## What is the purpose of the change
   
   This is the actualized version of https://github.com/apache/flink/pull/19077.
   
   Removes the `flink-sql-parser-hive` module from `flink-table` to decouple 
Hive from the table planner module, which is required to externalize the 
connector.
   
   ## Brief change log
   
   ...
   
   ## Verifying this change
   
   ...
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mbalassi commented on pull request #21189: [FLINK-24119][tests] Add random to Kafka tests topic name

2022-10-30 Thread GitBox


mbalassi commented on PR #21189:
URL: https://github.com/apache/flink/pull/21189#issuecomment-1296277519

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mbalassi commented on pull request #21180: [FLINK-29783][tests] Add some random to KafkaShuffleExactlyOnceITCase topic names

2022-10-30 Thread GitBox


mbalassi commented on PR #21180:
URL: https://github.com/apache/flink/pull/21180#issuecomment-1296277420

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29798) Rename K8s operator client code module

2022-10-30 Thread Jira
Márton Balassi created FLINK-29798:
--

 Summary: Rename K8s operator client code module
 Key: FLINK-29798
 URL: https://issues.apache.org/jira/browse/FLINK-29798
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.2.0
Reporter: Márton Balassi
Assignee: Márton Balassi
 Fix For: kubernetes-operator-1.3.0


The example code module in the k8s operator is named simply 
kubernetes-client-examples, and thus is published like so:

[https://repo1.maven.org/maven2/org/apache/flink/kubernetes-client-examples/1.2.0/]

We should make this more specific.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

2022-10-30 Thread GitBox


qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1296273034

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

2022-10-30 Thread GitBox


qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1296238300

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

2022-10-30 Thread GitBox


qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1008835739


##
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return 
Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+extends ExpressionDefaultVisitor> {
+
+private Function quoteIdentifierFunction;
+
+private static final Set> SUPPORTED_DATA_TYPES;
+
+static {
+SUPPORTED_DATA_TYPES = new HashSet<>();
+SUPPORTED_DATA_TYPES.add(IntType.class);
+SUPPORTED_DATA_TYPES.add(BigIntType.class);
+SUPPORTED_DATA_TYPES.add(BooleanType.class);
+SUPPORTED_DATA_TYPES.add(DecimalType.class);
+SUPPORTED_DATA_TYPES.add(DoubleType.class);
+SUPPORTED_DATA_TYPES.add(FloatType.class);
+SUPPORTED_DATA_TYPES.add(SmallIntType.class);
+SUPPORTED_DATA_TYPES.add(VarCharType.class);
+SUPPORTED_DATA_TYPES.add(TimestampType.class);
+SUPPORTED_DATA_TYPES.add(DateType.class);
+SUPPORTED_DATA_TYPES.add(TimeType.class);
+}
+
+public JdbcFilterPushdownPreparedStatementVisitor(
+Function quoteIdentifierFunction) {
+this.quoteIdentifierFunction = quoteIdentifierFunction;
+}
+
+@Override
+public Optional visit(CallExpression call) {
+if 
(BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("<", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
 {
+return renderBinaryOperator("<=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator(">", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
 {
+return renderBinaryOperator(">=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("<>", call.getResolvedCh

[GitHub] [flink] qingwei91 commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

2022-10-30 Thread GitBox


qingwei91 commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1008835739


##
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return 
Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+extends ExpressionDefaultVisitor> {
+
+private Function quoteIdentifierFunction;
+
+private static final Set> SUPPORTED_DATA_TYPES;
+
+static {
+SUPPORTED_DATA_TYPES = new HashSet<>();
+SUPPORTED_DATA_TYPES.add(IntType.class);
+SUPPORTED_DATA_TYPES.add(BigIntType.class);
+SUPPORTED_DATA_TYPES.add(BooleanType.class);
+SUPPORTED_DATA_TYPES.add(DecimalType.class);
+SUPPORTED_DATA_TYPES.add(DoubleType.class);
+SUPPORTED_DATA_TYPES.add(FloatType.class);
+SUPPORTED_DATA_TYPES.add(SmallIntType.class);
+SUPPORTED_DATA_TYPES.add(VarCharType.class);
+SUPPORTED_DATA_TYPES.add(TimestampType.class);
+SUPPORTED_DATA_TYPES.add(DateType.class);
+SUPPORTED_DATA_TYPES.add(TimeType.class);
+}
+
+public JdbcFilterPushdownPreparedStatementVisitor(
+Function quoteIdentifierFunction) {
+this.quoteIdentifierFunction = quoteIdentifierFunction;
+}
+
+@Override
+public Optional visit(CallExpression call) {
+if 
(BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("<", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
 {
+return renderBinaryOperator("<=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator(">", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
 {
+return renderBinaryOperator(">=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("<>", call.getResolvedCh

[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

2022-10-30 Thread GitBox


qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1296222402

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21194: Update fault_tolerance.md

2022-10-30 Thread GitBox


flinkbot commented on PR #21194:
URL: https://github.com/apache/flink/pull/21194#issuecomment-1296219332

   
   ## CI report:
   
   * ff2dc1314335fa68e043980a5a4f1827803ab96b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] chandan1602 opened a new pull request, #21194: Update fault_tolerance.md

2022-10-30 Thread GitBox


chandan1602 opened a new pull request, #21194:
URL: https://github.com/apache/flink/pull/21194

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] libenchao commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

2022-10-30 Thread GitBox


libenchao commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1008821587


##
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Visitor that convert Expression to ParameterizedPredicate. Return 
Optional.empty() if we cannot
+ * push down the filter.
+ */
+@Experimental
+public class JdbcFilterPushdownPreparedStatementVisitor
+extends ExpressionDefaultVisitor> {
+
+private Function quoteIdentifierFunction;
+
+private static final Set> SUPPORTED_DATA_TYPES;
+
+static {
+SUPPORTED_DATA_TYPES = new HashSet<>();
+SUPPORTED_DATA_TYPES.add(IntType.class);
+SUPPORTED_DATA_TYPES.add(BigIntType.class);
+SUPPORTED_DATA_TYPES.add(BooleanType.class);
+SUPPORTED_DATA_TYPES.add(DecimalType.class);
+SUPPORTED_DATA_TYPES.add(DoubleType.class);
+SUPPORTED_DATA_TYPES.add(FloatType.class);
+SUPPORTED_DATA_TYPES.add(SmallIntType.class);
+SUPPORTED_DATA_TYPES.add(VarCharType.class);
+SUPPORTED_DATA_TYPES.add(TimestampType.class);
+SUPPORTED_DATA_TYPES.add(DateType.class);
+SUPPORTED_DATA_TYPES.add(TimeType.class);
+}
+
+public JdbcFilterPushdownPreparedStatementVisitor(
+Function quoteIdentifierFunction) {
+this.quoteIdentifierFunction = quoteIdentifierFunction;
+}
+
+@Override
+public Optional visit(CallExpression call) {
+if 
(BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("<", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
 {
+return renderBinaryOperator("<=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator(">", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
 {
+return renderBinaryOperator(">=", call.getResolvedChildren());
+}
+if 
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+return renderBinaryOperator("<>", call.getResolvedCh

[GitHub] [flink-kubernetes-operator] mbalassi commented on pull request #417: [FLINK-29655] Split Flink CRD from flink-kubernates-operator module

2022-10-30 Thread GitBox


mbalassi commented on PR #417:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/417#issuecomment-1296153291

   @tweise has pointed out to me that we actually do not guarantee Java API 
compatibility at this point, and consequently it is best to make the changes 
now necessary to be able to provide said guarantee later.
   
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/compatibility/#java-api-compatibility


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xintongsong commented on pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization

2022-10-30 Thread GitBox


xintongsong commented on PR #21122:
URL: https://github.com/apache/flink/pull/21122#issuecomment-1296144735

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-29480) Skip invalid messages when writing

2022-10-30 Thread Salva (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17626176#comment-17626176
 ] 

Salva edited comment on FLINK-29480 at 10/30/22 7:08 AM:
-

[~mason6345] [~leonard] [~renqs] [~martijnvisser] I think the PR is ready for 
review. Can you please take a look?


was (Author: JIRAUSER287051):
[~mason6345] [~leonard] [~renqs] [~martijnvisser] From my perspective, the PR 
is ready for review. Can you please take a look?

> Skip invalid messages when writing
> --
>
> Key: FLINK-29480
> URL: https://issues.apache.org/jira/browse/FLINK-29480
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Salva
>Assignee: Salva
>Priority: Minor
>  Labels: pull-request-available
> Attachments: Screenshot 2022-10-28 at 13.48.12.png
>
>
> As reported in [1], it seems that it's not possible to skip invalid messages 
> when writing. More specifically, if there is an error serializing messages, 
> there is no option for skipping them and then Flink job enters a crash loop. 
> In particular, the `write` method of the `KafkaWriter` looks like this:
> {code:java}
> @Override
> public void write(IN element, Context context) throws IOException {
>   final ProducerRecord record = 
> recordSerializer.serialize(element, ...);
>   currentProducer.send(record, deliveryCallback); // line 200
>   numRecordsSendCounter.inc();
> } {code}
> So, If you make your `serialize` method return `null`, this is what you get 
> at runtime
> {code:java}
> java.lang.NullPointerException at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) 
> at 
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) 
> at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
>   {code}
> What I propose is to modify the KafkaWriter [2, 3] like this:
> {code:java}
> @Override
> public void write(IN element, Context context) throws IOException {
>   final ProducerRecord record = 
> recordSerializer.serialize(element, ...);
>   if (record != null) { // skip null records (check to be added)
> currentProducer.send(record, deliveryCallback);
> numRecordsSendCounter.inc();
>   }
> } {code}
> In order to at least give a chance of skipping those messages and move on to 
> the next ones.
> Obviously, one could prepend the sink with a flatMap operator for filtering 
> out invalid messages, but
>  # It looks weird that one has to prepend an operator for "making sure" that 
> the serializer will not fail right after. Wouldn't it be simpler to skip the 
> null records directly in order to avoid this pre-check? [4]
>  # It's such a simple change (apparently)
>  # Brings consistency/symmetry with the reading case [4, 5]
> To expand on point 3, by looking at `KafkaDeserializationSchema`:
> {code:java}
> T deserialize(ConsumerRecord record) throws Exception;
> default void deserialize(ConsumerRecord message, Collector 
> out) throws Exception {
>   T deserialized = deserialize(message);
>   if (deserialized != null) { // skip null records (check already exists)
> out.collect(deserialized);
>   }
> }  {code}
> one can simply return `null` in the overriden `deserialize` method in order 
> to skip any message that fails to be deserialized. Similarly, if one uses the 
> `KafkaRecordDeserializationSchema` interface instead:
> {code:java}
> void deserialize(ConsumerRecord record, Collector out) 
> throws IOException {code}
> then it's also possible not to invoke `out.collect(...)` on null records. To 
> me, it looks strange that the same flexibility is not given in the writing 
> case.
> *References*
> [1] [https://lists.apache.org/thread/ykmy4llovrrrzlvz0ng3x5yosskjg70h]
> [2] 
> [https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143]
>  
> [3] 
> [https://github.com/apache/flink/blob/f0fe85a50920da2b7d7da815db0a924940522e28/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197]
>  
> [4] [https://lists.apache.org/thread/pllv5dqq27xkvj6p3lj91vcz409pw38d] 
> [5] 
> [https://stackoverflow.com/questions/55538736/how-to-skip-corrupted-messages-in-flink]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >