[jira] [Updated] (FLINK-30118) Migrate DDB connector Integration Tests/ITCase to E2E module
[ https://issues.apache.org/jira/browse/FLINK-30118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-30118: -- Fix Version/s: aws-connector-3.0.0 (was: aws-connector-2.0.0) > Migrate DDB connector Integration Tests/ITCase to E2E module > > > Key: FLINK-30118 > URL: https://issues.apache.org/jira/browse/FLINK-30118 > Project: Flink > Issue Type: Improvement > Components: Connectors / DynamoDB >Reporter: Daren Wong >Priority: Major > Fix For: aws-connector-3.0.0 > > > Currently DDB connector > [ITCase|https://github.com/apache/flink-connector-aws/blob/53ea41008910237073804dc090d67a1e0852163d/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java#L77] > is implemented whereby it starts a [DDB docker > image|https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/DynamoDbContainer.java] > and run through several test scenarios on it. > The proposal is to move this ITCase to an e2e test that will be run as part > of Github Action. This will help speed up Maven builds without sacrificing > integration/e2e test to ensure quality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29908) Externalize and configure E2E tests
[ https://issues.apache.org/jira/browse/FLINK-29908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-29908: -- Parent: (was: FLINK-29438) Issue Type: Improvement (was: Sub-task) > Externalize and configure E2E tests > --- > > Key: FLINK-29908 > URL: https://issues.apache.org/jira/browse/FLINK-29908 > Project: Flink > Issue Type: Improvement >Reporter: Danny Cranmer >Priority: Major > > Migrate Amazon Kinesis and Firehose E2E test modules from Flink core to > flink-connector-aws -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29908) Externalize and configure E2E tests
[ https://issues.apache.org/jira/browse/FLINK-29908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-29908: -- Fix Version/s: aws-connector-3.0.0 > Externalize and configure E2E tests > --- > > Key: FLINK-29908 > URL: https://issues.apache.org/jira/browse/FLINK-29908 > Project: Flink > Issue Type: Improvement >Reporter: Danny Cranmer >Priority: Major > Fix For: aws-connector-3.0.0 > > > Migrate Amazon Kinesis and Firehose E2E test modules from Flink core to > flink-connector-aws -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29908) Externalize and configure E2E tests
[ https://issues.apache.org/jira/browse/FLINK-29908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-29908: -- Component/s: Connectors / AWS > Externalize and configure E2E tests > --- > > Key: FLINK-29908 > URL: https://issues.apache.org/jira/browse/FLINK-29908 > Project: Flink > Issue Type: Improvement > Components: Connectors / AWS >Reporter: Danny Cranmer >Priority: Major > Fix For: aws-connector-3.0.0 > > > Migrate Amazon Kinesis and Firehose E2E test modules from Flink core to > flink-connector-aws -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29907) Externalize AWS connectors from Flink core
[ https://issues.apache.org/jira/browse/FLINK-29907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-29907: -- Issue Type: Improvement (was: New Feature) > Externalize AWS connectors from Flink core > -- > > Key: FLINK-29907 > URL: https://issues.apache.org/jira/browse/FLINK-29907 > Project: Flink > Issue Type: Improvement > Components: Connectors / AWS >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Fix For: aws-connector-3.0.0 > > > Externlize the following modules from Flink core to the connectors repo: > - {{flink-connector-aws-base}} > - {{flink-connector-kinesis}} > - {{flink-connector-sql-kinesis}} > - {{flink-connector-aws-kinesis-streams}} > - {{flink-connector-sql-aws-kinesis-streams}} > - {{flink-connector-aws-kinesis-firehose}} > - {{flink-connector-sql-aws-kinesis-firehose}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29907) Externalize AWS connectors from Flink core
[ https://issues.apache.org/jira/browse/FLINK-29907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-29907: -- Parent: (was: FLINK-29438) Issue Type: New Feature (was: Sub-task) > Externalize AWS connectors from Flink core > -- > > Key: FLINK-29907 > URL: https://issues.apache.org/jira/browse/FLINK-29907 > Project: Flink > Issue Type: New Feature > Components: Connectors / AWS >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Fix For: aws-connector-3.0.0 > > > Externlize the following modules from Flink core to the connectors repo: > - {{flink-connector-aws-base}} > - {{flink-connector-kinesis}} > - {{flink-connector-sql-kinesis}} > - {{flink-connector-aws-kinesis-streams}} > - {{flink-connector-sql-aws-kinesis-streams}} > - {{flink-connector-aws-kinesis-firehose}} > - {{flink-connector-sql-aws-kinesis-firehose}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28991) Add documentation for lookup table caching feature
[ https://issues.apache.org/jira/browse/FLINK-28991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637602#comment-17637602 ] Matthias Pohl commented on FLINK-28991: --- [~renqs] what's the status of this issue? It feels like we shouldn't have closed the parent issue if there's still an open subtask. > Add documentation for lookup table caching feature > -- > > Key: FLINK-28991 > URL: https://issues.apache.org/jira/browse/FLINK-28991 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Affects Versions: 1.16.0 >Reporter: Qingsheng Ren >Assignee: Qingsheng Ren >Priority: Major > > We need a documentation to describe how to implement a lookup table based on > the new caching framework -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update
JingsongLi commented on code in PR #395: URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030088540 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java: ## @@ -48,8 +48,17 @@ public KeyValue getResult() { return latestKv; } -@Override -public MergeFunction copy() { -return new DeduplicateMergeFunction(); +public static MergeFunctionFactory factory() { +return new Factory(); +} + +private static class Factory implements MergeFunctionFactory { + +private static final long serialVersionUID = 1L; + +@Override +public MergeFunction create(@Nullable int[][] projection) { +return new DeduplicateMergeFunction(); Review Comment: Actually, this class does not require projection relevant information. In `PartialUpdateMergeFunction`, validation is not required too, because the `Project` class will throw an exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update
JingsongLi commented on code in PR #395: URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030087733 ## flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java: ## @@ -65,8 +65,10 @@ public void testMergeRead() { batchSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))"); batchSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')"); -List result = batchSql("SELECT * FROM T"); -assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6")); +assertThat(batchSql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6")); + +// projection +assertThat(batchSql("SELECT a FROM T")).containsExactlyInAnyOrder(Row.of(4)); Review Comment: At present, we do not support nested pushdowns, so there is no difference between array maps and ordinary fields. ## flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java: ## @@ -711,6 +710,10 @@ public void testMergeInMemory() { (long) 10101000, (float) 0, 1.11)); + +// projection +assertThat(batchSql("SELECT f,e FROM T1")) Review Comment: Ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-30144) Guarantee Flink ML operators function correctly with object-reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-30144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin resolved FLINK-30144. -- Resolution: Fixed > Guarantee Flink ML operators function correctly with object-reuse enabled > - > > Key: FLINK-30144 > URL: https://issues.apache.org/jira/browse/FLINK-30144 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Affects Versions: ml-2.1.0 >Reporter: Yunfeng Zhou >Assignee: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > > Flink ML operators are supposed to function correctly when object-reuse is > enabled, as a part of Flink ML's performance improvement. Thus we need to add > this configuration to Flink ML test cases and fix any possible bugs > discovered along. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] lindong28 commented on pull request #179: [FLINK-30144] Enable object reuse in Flink ML tests
lindong28 commented on PR #179: URL: https://github.com/apache/flink-ml/pull/179#issuecomment-1324624261 Thanks for the PR! LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] lindong28 merged pull request #179: [FLINK-30144] Enable object reuse in Flink ML tests
lindong28 merged PR #179: URL: https://github.com/apache/flink-ml/pull/179 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-30144) Guarantee Flink ML operators function correctly with object-reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-30144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin reassigned FLINK-30144: Assignee: Yunfeng Zhou > Guarantee Flink ML operators function correctly with object-reuse enabled > - > > Key: FLINK-30144 > URL: https://issues.apache.org/jira/browse/FLINK-30144 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning >Affects Versions: ml-2.1.0 >Reporter: Yunfeng Zhou >Assignee: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > > Flink ML operators are supposed to function correctly when object-reuse is > enabled, as a part of Flink ML's performance improvement. Thus we need to add > this configuration to Flink ML test cases and fix any possible bugs > discovered along. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lincoln-lil commented on pull request #20745: [FLINK-28988] Don't push filters down into the right table for temporal join
lincoln-lil commented on PR #20745: URL: https://github.com/apache/flink/pull/20745#issuecomment-1324621288 > @shuiqiangchen another case we need to consider: for an upsert source, e.g., upsert-kafka, 1. without this (partial) filter pushdown optimization, when the upsert key of source is same as the join key, then the execution plan can be optimized to upsert mode (does not require UB message) 2. while after this optimization, the filter pushed down will instead degrade the upsert mode to retract mode, the corresponding upsert-kafka source will add an expensive materialization node `ChangelogNormalize` to keep the correctness(more details in FLINK-9528) currently it is hard to tell which of these two choices maybe better for all use cases. And for the 2nd pattern, if we try to de-optimize in this bad pushdown case, the filter pull-up will make things more complicated, this is the biggest concern -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil commented on a diff in pull request #20745: [FLINK-28988] Don't push filters down into the right table for temporal join
lincoln-lil commented on code in PR #20745: URL: https://github.com/apache/flink/pull/20745#discussion_r1030076295 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java: ## @@ -128,7 +129,9 @@ protected void perform(RelOptRuleCall call, Filter filter, Join join) { joinType, true, !joinType.generatesNullsOnLeft(), -!joinType.generatesNullsOnRight(), +!joinType.generatesNullsOnRight() +&& !TemporalJoinUtil.containsInitialTemporalJoinCondition( Review Comment: I think we can only deal with the event time temporal join since processing time temporal join is unsupported for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shuiqiangchen commented on pull request #20745: [FLINK-28988] Don't push filters down into the right table for temporal join
shuiqiangchen commented on PR #20745: URL: https://github.com/apache/flink/pull/20745#issuecomment-1324613690 Hi @lincoln-lil, I have updated the pr, please have a look. The overall implementation seems a bit straight forward but can cover most of cases. Please point it out if there is any omission. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shuiqiangchen commented on a diff in pull request #20745: [FLINK-28988] Don't push filters down into the right table for temporal join
shuiqiangchen commented on code in PR #20745: URL: https://github.com/apache/flink/pull/20745#discussion_r1030072896 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java: ## @@ -128,7 +129,9 @@ protected void perform(RelOptRuleCall call, Filter filter, Join join) { joinType, true, !joinType.generatesNullsOnLeft(), -!joinType.generatesNullsOnRight(), +!joinType.generatesNullsOnRight() +&& !TemporalJoinUtil.containsInitialTemporalJoinCondition( Review Comment: Thanks! I added a `FlinkJoinReduceExpressionsRule` based on `JoinReduceExpressionsRule` as you suggested and skip the constant reduce when it is a temporal join, it can solve the case. And there is one more question that it seems processing time temporal is not supported yet, do we only consider event time temporal join in this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shuiqiangchen commented on a diff in pull request #20745: [FLINK-28988] Don't push filters down into the right table for temporal join
shuiqiangchen commented on code in PR #20745: URL: https://github.com/apache/flink/pull/20745#discussion_r1030072896 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterJoinRule.java: ## @@ -128,7 +129,9 @@ protected void perform(RelOptRuleCall call, Filter filter, Join join) { joinType, true, !joinType.generatesNullsOnLeft(), -!joinType.generatesNullsOnRight(), +!joinType.generatesNullsOnRight() +&& !TemporalJoinUtil.containsInitialTemporalJoinCondition( Review Comment: Thanks! I added a `FlinkJoinReduceExpressionsRule` based on `JoinReduceExpressionsRule` and skip the constant reduce when it is a temporal join, it can solve the case. And there is one more question that it seems processing time temporal is not supported yet, do we only consider event time temporal join in this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-30161) Add TableSchema validation before it is commited
Shammon created FLINK-30161: --- Summary: Add TableSchema validation before it is commited Key: FLINK-30161 URL: https://issues.apache.org/jira/browse/FLINK-30161 Project: Flink Issue Type: Improvement Components: Table Store Affects Versions: table-store-0.3.0 Reporter: Shammon When creating or altering a table, there maybe some configuration or ddl conflicts, we need to check them before committing the table schema -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update
SteNicholas commented on code in PR #395: URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030044691 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java: ## @@ -48,8 +48,17 @@ public KeyValue getResult() { return latestKv; } -@Override -public MergeFunction copy() { -return new DeduplicateMergeFunction(); +public static MergeFunctionFactory factory() { +return new Factory(); +} + +private static class Factory implements MergeFunctionFactory { + +private static final long serialVersionUID = 1L; + +@Override +public MergeFunction create(@Nullable int[][] projection) { +return new DeduplicateMergeFunction(); Review Comment: Does this need to comment for unsupport of the nested projection? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update
SteNicholas commented on code in PR #395: URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030044159 ## flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java: ## @@ -711,6 +710,10 @@ public void testMergeInMemory() { (long) 10101000, (float) 0, 1.11)); + +// projection +assertThat(batchSql("SELECT f,e FROM T1")) Review Comment: Does this cover the array and list projection? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update
SteNicholas commented on code in PR #395: URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030043673 ## flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java: ## @@ -65,8 +65,10 @@ public void testMergeRead() { batchSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))"); batchSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')"); -List result = batchSql("SELECT * FROM T"); -assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6")); +assertThat(batchSql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6")); + +// projection +assertThat(batchSql("SELECT a FROM T")).containsExactlyInAnyOrder(Row.of(4)); Review Comment: Does this cover the array and list projection? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30103) Test InputFormatCacheLoaderTest.checkCounter failed due to unexpected value on azure
[ https://issues.apache.org/jira/browse/FLINK-30103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl closed FLINK-30103. - Resolution: Duplicate I guess, [~pnowojski] is right. I went through the failed builds of FLINK-29405 and FLINK-30103. They are all failing for the same error. There is a slight diff between the stacktraces as Piotr pointed out which is neglectable. I'm closing this issue in favor of FLINK-29405. > Test InputFormatCacheLoaderTest.checkCounter failed due to unexpected value > on azure > > > Key: FLINK-30103 > URL: https://issues.apache.org/jira/browse/FLINK-30103 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.0, 1.17.0 >Reporter: Leonard Xu >Priority: Major > Labels: test-stability > > {noformat} > Nov 20 02:43:43 [ERROR] Failures: > Nov 20 02:43:43 [ERROR] InputFormatCacheLoaderTest.checkCounter:74 > Nov 20 02:43:43 Expecting AtomicInteger(0) to have value: > Nov 20 02:43:43 0 > Nov 20 02:43:43 but did not > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43319=logs=de826397-1924-5900-0034-51895f69d4b7=f311e913-93a2-5a37-acab-4a63e1328f94 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] SmirAlex commented on pull request #21365: [FLINK-29427] Fix unstable test LookupJoinITCase by compiling Projection class in advance using userCodeClassLoader
SmirAlex commented on PR #21365: URL: https://github.com/apache/flink/pull/21365#issuecomment-1324532462 cc @PatrickRen @leonardBang @zentol Can you please review this fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update
tsreaper commented on code in PR #395: URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030008452 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java: ## @@ -78,7 +78,7 @@ private CompactResult rewriteFullCompaction(List> sections) thro runReaders, keyComparator, new FullChangelogMergeFunctionWrapper( -mergeFunction.copy(), maxLevel)); +mfFactory.create(null), maxLevel)); Review Comment: Add a default implementation in the interface. ```java default MergeFunction create() { return create(null); } ``` ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java: ## @@ -78,7 +78,7 @@ private CompactResult rewriteFullCompaction(List> sections) thro runReaders, keyComparator, new FullChangelogMergeFunctionWrapper( -mergeFunction.copy(), maxLevel)); +mfFactory.create(null), maxLevel)); Review Comment: Add a default implementation in the interface. ```java default MergeFunction create() { return create(null); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30110) Enable from-timestamp log scan when timestamp-millis is configured
[ https://issues.apache.org/jira/browse/FLINK-30110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30110: --- Labels: pull-request-available (was: ) > Enable from-timestamp log scan when timestamp-millis is configured > -- > > Key: FLINK-30110 > URL: https://issues.apache.org/jira/browse/FLINK-30110 > Project: Flink > Issue Type: Improvement > Components: Table Store >Reporter: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] SteNicholas opened a new pull request, #398: [FLINK-30110] Enable from-timestamp log scan when timestamp-millis is configured
SteNicholas opened a new pull request, #398: URL: https://github.com/apache/flink-table-store/pull/398 When `log.scan.timestamp-millis` is configured and `log.scan` isn't configured, the `log.scan` should be setting to `from-timestamp` by default. **The brief change log** - `FileStoreTableFactory#validateOptions` should validate the case that `log.scan.timestamp-millis` is configured. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update
JingsongLi commented on code in PR #395: URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030001343 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java: ## @@ -246,4 +245,65 @@ public List keyFields(TableSchema schema) { return addKeyNamePrefix(schema.trimmedPrimaryKeysFields()); } } + +private static class PartialUpdateMergeFunctionFactory Review Comment: I will create a factory for each merge function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21367: [FLINK-28741][hive] fix un-expect behavior for cast string to boolean
flinkbot commented on PR #21367: URL: https://github.com/apache/flink/pull/21367#issuecomment-1324514277 ## CI report: * 94860af2fa21560e9a4e647d882c599f4ba31434 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28741) Unexpected result if insert 'false' to boolean column
[ https://issues.apache.org/jira/browse/FLINK-28741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28741: --- Labels: pull-request-available (was: ) > Unexpected result if insert 'false' to boolean column > - > > Key: FLINK-28741 > URL: https://issues.apache.org/jira/browse/FLINK-28741 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Affects Versions: 1.15.0, 1.15.1 >Reporter: Jing Zhang >Priority: Major > Labels: pull-request-available > > Using hive dialect to insert a string 'false' to boolean column, the result > is true. It seems to treat all non-empty string as true. > The error could be reproduced in the following ITCase. > {code:java} > @Test > public void testUnExpectedResult() throws ExecutionException, > InterruptedException { > HiveModule hiveModule = new HiveModule(hiveCatalog.getHiveVersion()); > CoreModule coreModule = CoreModule.INSTANCE; > for (String loaded : tableEnv.listModules()) { > tableEnv.unloadModule(loaded); > } > tableEnv.loadModule("hive", hiveModule); > tableEnv.loadModule("core", coreModule); > // create source table > tableEnv.executeSql( > "CREATE TABLE test_table (params string) PARTITIONED BY (`p_date` > string)"); > // prepare a data which value is 'false' > tableEnv.executeSql("insert overwrite test_table partition(p_date = > '20220612') values ('false')") > .await(); > // create target table which only contain one boolean column > tableEnv.executeSql( > "CREATE TABLE target_table (flag boolean) PARTITIONED BY > (`p_date` string)"); > // > tableEnv.executeSql( > "insert overwrite table target_table partition(p_date = > '20220724') " > + "SELECT params FROM test_table WHERE > p_date='20220612'").await(); > TableImpl flinkTable = > (TableImpl) tableEnv.sqlQuery("select flag from target_table > where p_date = '20220724'"); >List results = > CollectionUtil.iteratorToList(flinkTable.execute().collect()); > assertEquals( > "[false]", results.toString()); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] luoyuxia opened a new pull request, #21367: [FLINK-28741][hive] fix un-expect behavior for cast string to boolean
luoyuxia opened a new pull request, #21367: URL: https://github.com/apache/flink/pull/21367 ## What is the purpose of the change To fix the un-expect behavior for cast to boolean ## Brief change log - Define a custom cast to boolean function and overwrite the logic for casting string to boolean ## Verifying this change Added test in HiveDialectQueryITCast ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update
JingsongLi commented on code in PR #395: URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1029996831 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionFactory.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** Factory to create {@link MergeFunction}. */ +@FunctionalInterface +public interface MergeFunctionFactory extends Serializable { + +MergeFunction create(@Nullable int[][] projection); + +static MergeFunctionFactory of(MergeFunction mergeFunction) { +return new InstanceFactory<>(mergeFunction); +} + +/** A {@link MergeFunctionFactory} from a {@link MergeFunction} instance. */ +class InstanceFactory implements MergeFunctionFactory { Review Comment: Nice point! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update
JingsongLi commented on code in PR #395: URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1029996540 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java: ## @@ -169,7 +172,7 @@ public RecordReader createReader(DataSplit split) throws IOException { List> sectionReaders = new ArrayList<>(); MergeFunctionWrapper mergeFuncWrapper = -new ReducerMergeFunctionWrapper(mergeFunction.copy()); +new ReducerMergeFunctionWrapper(mfFactory.create(valueProjection)); Review Comment: Yes, I should delete `.copy()`, we have factory now, we don't need to have a copy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update
JingsongLi commented on code in PR #395: URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1029996389 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionFactory.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** Factory to create {@link MergeFunction}. */ +@FunctionalInterface +public interface MergeFunctionFactory extends Serializable { + +MergeFunction create(@Nullable int[][] projection); + +static MergeFunctionFactory of(MergeFunction mergeFunction) { +return new InstanceFactory<>(mergeFunction); +} + +/** A {@link MergeFunctionFactory} from a {@link MergeFunction} instance. */ +class InstanceFactory implements MergeFunctionFactory { + +private static final long serialVersionUID = 1L; + +private final MergeFunction mergeFunction; + +public InstanceFactory(MergeFunction mergeFunction) { +this.mergeFunction = mergeFunction; +} + +@Override +public MergeFunction create(@Nullable int[][] projection) { +return mergeFunction; Review Comment: I should change this to `new instance`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29073) [FLIP-91] Support SQL Gateway(Part 2)
[ https://issues.apache.org/jira/browse/FLINK-29073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637552#comment-17637552 ] yuzelin commented on FLINK-29073: - [~eric.xiao] Hi, it's very nice that you can provide help. Recently, I'm work on the SQL Gateway and Client. Because I will change some public API in SQL gateway, so I am writing a FLIP now and it is near completion. I think we can have some talk after my proposal is accepted. Moreover, I have PRs to the subtask 6 & 7, if you are interested in, maybe you can take a look. And the subtask 5 (add rest api for 7) is still open, it would be nice if you can have a look at this ticket. The FLINK-15472 has more information about the SQL Gateway and rest api related changes. > [FLIP-91] Support SQL Gateway(Part 2) > - > > Key: FLINK-29073 > URL: https://issues.apache.org/jira/browse/FLINK-29073 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Client, Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: Shengkai Fang >Priority: Major > > Issue continues improving the SQL Gateway and allows the SQL Client submit > jobs to the SQL Gateway. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29073) [FLIP-91] Support SQL Gateway(Part 2)
[ https://issues.apache.org/jira/browse/FLINK-29073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637552#comment-17637552 ] yuzelin edited comment on FLINK-29073 at 11/23/22 3:28 AM: --- [~eric.xiao] Hi, it's very nice that you can provide help. Recently, I'm working on the SQL Gateway and Client. Because I will change some public API in SQL gateway, so I am writing a FLIP now and it is near completion. I think we can have some talk after my proposal is accepted. Moreover, I have PRs to the subtask 6 & 7, if you are interested in, maybe you can take a look. And the subtask 5 (add rest api for 7) is still open, it would be nice if you can have a look at this ticket. The FLINK-15472 has more information about the SQL Gateway and rest api related changes. was (Author: JIRAUSER293222): [~eric.xiao] Hi, it's very nice that you can provide help. Recently, I'm work on the SQL Gateway and Client. Because I will change some public API in SQL gateway, so I am writing a FLIP now and it is near completion. I think we can have some talk after my proposal is accepted. Moreover, I have PRs to the subtask 6 & 7, if you are interested in, maybe you can take a look. And the subtask 5 (add rest api for 7) is still open, it would be nice if you can have a look at this ticket. The FLINK-15472 has more information about the SQL Gateway and rest api related changes. > [FLIP-91] Support SQL Gateway(Part 2) > - > > Key: FLINK-29073 > URL: https://issues.apache.org/jira/browse/FLINK-29073 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Client, Table SQL / Gateway >Affects Versions: 1.17.0 >Reporter: Shengkai Fang >Priority: Major > > Issue continues improving the SQL Gateway and allows the SQL Client submit > jobs to the SQL Gateway. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update
tsreaper commented on code in PR #395: URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1029984979 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionFactory.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** Factory to create {@link MergeFunction}. */ +@FunctionalInterface +public interface MergeFunctionFactory extends Serializable { + +MergeFunction create(@Nullable int[][] projection); + +static MergeFunctionFactory of(MergeFunction mergeFunction) { +return new InstanceFactory<>(mergeFunction); +} + +/** A {@link MergeFunctionFactory} from a {@link MergeFunction} instance. */ +class InstanceFactory implements MergeFunctionFactory { Review Comment: Create a factory for each merge function? So that users are not allowed to create a merge function instance directly (constructors should be private). They must create merge functions from the factories. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update
tsreaper commented on code in PR #395: URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1029984725 ## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java: ## @@ -246,4 +245,65 @@ public List keyFields(TableSchema schema) { return addKeyNamePrefix(schema.trimmedPrimaryKeysFields()); } } + +private static class PartialUpdateMergeFunctionFactory Review Comment: I prefer moving these factories to the inner class of the corresponding merge functions, or make them an independent class. ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java: ## @@ -169,7 +172,7 @@ public RecordReader createReader(DataSplit split) throws IOException { List> sectionReaders = new ArrayList<>(); MergeFunctionWrapper mergeFuncWrapper = -new ReducerMergeFunctionWrapper(mergeFunction.copy()); +new ReducerMergeFunctionWrapper(mfFactory.create(valueProjection)); Review Comment: `.copy()` is lost with current implementation. ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionFactory.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** Factory to create {@link MergeFunction}. */ +@FunctionalInterface +public interface MergeFunctionFactory extends Serializable { + +MergeFunction create(@Nullable int[][] projection); + +static MergeFunctionFactory of(MergeFunction mergeFunction) { +return new InstanceFactory<>(mergeFunction); +} + +/** A {@link MergeFunctionFactory} from a {@link MergeFunction} instance. */ +class InstanceFactory implements MergeFunctionFactory { + +private static final long serialVersionUID = 1L; + +private final MergeFunction mergeFunction; + +public InstanceFactory(MergeFunction mergeFunction) { +this.mergeFunction = mergeFunction; +} + +@Override +public MergeFunction create(@Nullable int[][] projection) { +return mergeFunction; Review Comment: `mergeFunction.copy()`? ## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionFactory.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** Factory to create {@link MergeFunction}. */ +@FunctionalInterface +public interface MergeFunctionFactory extends Serializable { + +MergeFunction create(@Nullable int[][] projection); + +static MergeFunctionFactory of(MergeFunction mergeFunction) { +return new InstanceFactory<>(mergeFunction); +} + +/** A {@link MergeFunctionFactory} from a {@link MergeFunction} instance. */ +class InstanceFactory implements MergeFunctionFactory { Review Comment: Create a factory for each merge function? So that users are not allowed to create a merge function instance directly. They must create merge functions from the factories. -- This is an automated message
[jira] [Updated] (FLINK-30159) Add Transformer for ANOVATest
[ https://issues.apache.org/jira/browse/FLINK-30159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30159: --- Labels: pull-request-available (was: ) > Add Transformer for ANOVATest > - > > Key: FLINK-30159 > URL: https://issues.apache.org/jira/browse/FLINK-30159 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Jiang Xin >Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > Add Transformer for ANOVATest. > Its function would be at least equivalent to Spark's > org.apache.spark.ml.stat.ANOVATest. The relevant PR should contain the > following components: > * Java implementation/test (Must include) > * Python implementation/test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] jiangxin369 opened a new pull request, #180: [FLINK-30159] Add Transformer for ANOVATest
jiangxin369 opened a new pull request, #180: URL: https://github.com/apache/flink-ml/pull/180 ## What is the purpose of the change Add Transformer for ANOVATest. ## Brief change log - Adds shared parameter `HaSFlatten` - Adds Transformer implementation of ANOVATest in Java. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30159) Add Transformer for ANOVATest
[ https://issues.apache.org/jira/browse/FLINK-30159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Xin updated FLINK-30159: -- Description: Add Transformer for ANOVATest. Its function would be at least equivalent to Spark's org.apache.spark.ml.stat.ANOVATest. The relevant PR should contain the following components: * Java implementation/test (Must include) * Python implementation/test was: Add the Estimator and Transformer for ANOVATest. Its function would be at least equivalent to Spark's org.apache.spark.ml.stat.ANOVATest. The relevant PR should contain the following components: * Java implementation/test (Must include) * Python implementation/test > Add Transformer for ANOVATest > - > > Key: FLINK-30159 > URL: https://issues.apache.org/jira/browse/FLINK-30159 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Jiang Xin >Priority: Major > Fix For: ml-2.2.0 > > > Add Transformer for ANOVATest. > Its function would be at least equivalent to Spark's > org.apache.spark.ml.stat.ANOVATest. The relevant PR should contain the > following components: > * Java implementation/test (Must include) > * Python implementation/test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30160) Add Transformer for FValueTest
Jiang Xin created FLINK-30160: - Summary: Add Transformer for FValueTest Key: FLINK-30160 URL: https://issues.apache.org/jira/browse/FLINK-30160 Project: Flink Issue Type: Sub-task Components: Library / Machine Learning Reporter: Jiang Xin Fix For: ml-2.2.0 Add Transformer for FValueTest. Its function would be at least equivalent to Spark's org.apache.spark.ml.stat.FValueTest. The relevant PR should contain the following components: * Java implementation/test (Must include) * Python implementation/test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30159) Add Transformer for ANOVATest
Jiang Xin created FLINK-30159: - Summary: Add Transformer for ANOVATest Key: FLINK-30159 URL: https://issues.apache.org/jira/browse/FLINK-30159 Project: Flink Issue Type: Sub-task Components: Library / Machine Learning Reporter: Jiang Xin Fix For: ml-2.2.0 Add the Estimator and Transformer for ANOVATest. Its function would be at least equivalent to Spark's org.apache.spark.ml.stat.ANOVATest. The relevant PR should contain the following components: * Java implementation/test (Must include) * Python implementation/test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30139) CodeGenLoader fails when temporary directory is a symlink
[ https://issues.apache.org/jira/browse/FLINK-30139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-30139. Resolution: Fixed master: ae96b335e470a068a7e87f3d1a22c58bf2d82256 > CodeGenLoader fails when temporary directory is a symlink > - > > Key: FLINK-30139 > URL: https://issues.apache.org/jira/browse/FLINK-30139 > Project: Flink > Issue Type: Bug > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > > Same to FLINK-28102 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi merged pull request #397: [FLINK-30139] CodeGenLoader fails when temporary directory is a symlink
JingsongLi merged PR #397: URL: https://github.com/apache/flink-table-store/pull/397 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-30143) Table Store fails when temporary directory is a symlink
[ https://issues.apache.org/jira/browse/FLINK-30143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng closed FLINK-30143. --- Resolution: Duplicate > Table Store fails when temporary directory is a symlink > --- > > Key: FLINK-30143 > URL: https://issues.apache.org/jira/browse/FLINK-30143 > Project: Flink > Issue Type: Bug > Components: Table Store >Affects Versions: table-store-0.3.0, table-store-0.2.2 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Fix For: table-store-0.3.0, table-store-0.2.2 > > > When {{java.io.tmpdir}} points to a symbolic link, the following exception > will be thrown: > {code} > java.lang.ExceptionInInitializerError > at > org.apache.flink.table.store.codegen.CodeGenLoader.getInstance(CodeGenLoader.java:118) > at > org.apache.flink.table.store.codegen.CodeGenUtils.newProjection(CodeGenUtils.java:47) > at > org.apache.flink.table.store.table.sink.SinkRecordConverter.(SinkRecordConverter.java:68) > at > org.apache.flink.table.store.table.sink.SinkRecordConverter.(SinkRecordConverter.java:50) > at > org.apache.flink.table.store.table.sink.SinkRecordConverterTest.converter(SinkRecordConverterTest.java:99) > at > org.apache.flink.table.store.table.sink.SinkRecordConverterTest.converter(SinkRecordConverterTest.java:75) > at > org.apache.flink.table.store.table.sink.SinkRecordConverterTest.testBucket(SinkRecordConverterTest.java:58) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at >
[jira] [Closed] (FLINK-30101) Always use StandaloneClientHAServices to create RestClusterClient when retriving a Flink on YARN cluster client
[ https://issues.apache.org/jira/browse/FLINK-30101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-30101. Resolution: Won't Do > Always use StandaloneClientHAServices to create RestClusterClient when > retriving a Flink on YARN cluster client > > > Key: FLINK-30101 > URL: https://issues.apache.org/jira/browse/FLINK-30101 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission >Affects Versions: 1.16.0 >Reporter: Zhanghao Chen >Priority: Major > Fix For: 1.17.0 > > > *Problem* > Currently, the procedure of retrieving a Flink on YARN cluster client is as > follows (in YarnClusterDescriptor#retrieve method): > # Get application report from YARN > # Set rest.address & rest.port using the info from application report > # Create a new RestClusterClient using the updated configuration, will use > client HA serivce to fetch the rest.address & rest.port if HA is enabled > Here, we can see that the usage of client HA in step 3 is redundant, as we've > already got the rest.address & rest.port from YARN application report. When > ZK HA is enabled, this would take ~1.5 s to initialize client HA services and > fetch the rest IP & port. > 1.5 s can mean a lot for latency-sensitive client operations. In my company, > we use Flink client to submit short-running session jobs and e2e latency is > critical. The job submission time is around 10 s on average, and 1.5s would > mean a 15% time saving. > *Proposal* > When retrieving a Flink on YARN cluster client, use > StandaloneClientHAServices to > create RestClusterClient instead as we have pre-fetched rest.address & > rest.port from YARN application report. This is also what we did in > KubernetesClusterDescriptor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-30101) Always use StandaloneClientHAServices to create RestClusterClient when retriving a Flink on YARN cluster client
[ https://issues.apache.org/jira/browse/FLINK-30101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song reopened FLINK-30101: -- > Always use StandaloneClientHAServices to create RestClusterClient when > retriving a Flink on YARN cluster client > > > Key: FLINK-30101 > URL: https://issues.apache.org/jira/browse/FLINK-30101 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission >Affects Versions: 1.16.0 >Reporter: Zhanghao Chen >Priority: Major > Fix For: 1.17.0 > > > *Problem* > Currently, the procedure of retrieving a Flink on YARN cluster client is as > follows (in YarnClusterDescriptor#retrieve method): > # Get application report from YARN > # Set rest.address & rest.port using the info from application report > # Create a new RestClusterClient using the updated configuration, will use > client HA serivce to fetch the rest.address & rest.port if HA is enabled > Here, we can see that the usage of client HA in step 3 is redundant, as we've > already got the rest.address & rest.port from YARN application report. When > ZK HA is enabled, this would take ~1.5 s to initialize client HA services and > fetch the rest IP & port. > 1.5 s can mean a lot for latency-sensitive client operations. In my company, > we use Flink client to submit short-running session jobs and e2e latency is > critical. The job submission time is around 10 s on average, and 1.5s would > mean a 15% time saving. > *Proposal* > When retrieving a Flink on YARN cluster client, use > StandaloneClientHAServices to > create RestClusterClient instead as we have pre-fetched rest.address & > rest.port from YARN application report. This is also what we did in > KubernetesClusterDescriptor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29427) LookupJoinITCase failed with classloader problem
[ https://issues.apache.org/jira/browse/FLINK-29427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637541#comment-17637541 ] Matthias Pohl commented on FLINK-29427: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=43380=logs=0c940707-2659-5648-cbe6-a1ad63045f0a=075c2716-8010-5565-fe08-3c4bb45824a4=26173 > LookupJoinITCase failed with classloader problem > > > Key: FLINK-29427 > URL: https://issues.apache.org/jira/browse/FLINK-29427 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Alexander Smirnov >Priority: Blocker > Labels: pull-request-available, test-stability > > {code:java} > 2022-09-27T02:49:20.9501313Z Sep 27 02:49:20 Caused by: > org.codehaus.janino.InternalCompilerException: Compiling > "KeyProjection$108341": Trying to access closed classloader. Please check if > you store classloaders directly or indirectly in static fields. If the > stacktrace suggests that the leak occurs in a third party library and cannot > be fixed immediately, you can disable this check with the configuration > 'classloader.check-leaked-classloader'. > 2022-09-27T02:49:20.9502654Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:382) > 2022-09-27T02:49:20.9503366Z Sep 27 02:49:20 at > org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > 2022-09-27T02:49:20.9504044Z Sep 27 02:49:20 at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > 2022-09-27T02:49:20.9504704Z Sep 27 02:49:20 at > org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > 2022-09-27T02:49:20.9505341Z Sep 27 02:49:20 at > org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > 2022-09-27T02:49:20.9505965Z Sep 27 02:49:20 at > org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > 2022-09-27T02:49:20.9506584Z Sep 27 02:49:20 at > org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > 2022-09-27T02:49:20.9507261Z Sep 27 02:49:20 at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104) > 2022-09-27T02:49:20.9507883Z Sep 27 02:49:20 ... 30 more > 2022-09-27T02:49:20.9509266Z Sep 27 02:49:20 Caused by: > java.lang.IllegalStateException: Trying to access closed classloader. Please > check if you store classloaders directly or indirectly in static fields. If > the stacktrace suggests that the leak occurs in a third party library and > cannot be fixed immediately, you can disable this check with the > configuration 'classloader.check-leaked-classloader'. > 2022-09-27T02:49:20.9510835Z Sep 27 02:49:20 at > org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:184) > 2022-09-27T02:49:20.9511760Z Sep 27 02:49:20 at > org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192) > 2022-09-27T02:49:20.9512456Z Sep 27 02:49:20 at > java.lang.Class.forName0(Native Method) > 2022-09-27T02:49:20.9513014Z Sep 27 02:49:20 at > java.lang.Class.forName(Class.java:348) > 2022-09-27T02:49:20.9513649Z Sep 27 02:49:20 at > org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:89) > 2022-09-27T02:49:20.9514339Z Sep 27 02:49:20 at > org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:312) > 2022-09-27T02:49:20.9514990Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:8556) > 2022-09-27T02:49:20.9515659Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6749) > 2022-09-27T02:49:20.9516337Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594) > 2022-09-27T02:49:20.9516989Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6573) > 2022-09-27T02:49:20.9517632Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler.access$13900(UnitCompiler.java:215) > 2022-09-27T02:49:20.9518319Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6481) > 2022-09-27T02:49:20.9519018Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6476) > 2022-09-27T02:49:20.9519680Z Sep 27 02:49:20 at > org.codehaus.janino.Java$ReferenceType.accept(Java.java:3928) > 2022-09-27T02:49:20.9520386Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6476) > 2022-09-27T02:49:20.9521042Z Sep 27 02:49:20 at > org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6469) > 2022-09-27T02:49:20.9521677Z Sep 27 02:49:20 at >
[jira] [Closed] (FLINK-29661) DatabaseCalciteSchema$getTable() cannot get statistics for partition table
[ https://issues.apache.org/jira/browse/FLINK-29661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yunhong Zheng closed FLINK-29661. - Resolution: Not A Bug > DatabaseCalciteSchema$getTable() cannot get statistics for partition table > -- > > Key: FLINK-29661 > URL: https://issues.apache.org/jira/browse/FLINK-29661 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Yunhong Zheng >Priority: Major > Fix For: 1.17.0 > > > DatabaseCalciteSchema$getTable() cannot get statistics for partition table. > DatabaseCalciteShema$extractTableStats() don't consider the situation that > the table is partition table, and it's stats need to be collected by > catalog.getPartitionStatistics() and catalog.getPartitionColumnStatistics() -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29661) DatabaseCalciteSchema$getTable() cannot get statistics for partition table
[ https://issues.apache.org/jira/browse/FLINK-29661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637526#comment-17637526 ] Yunhong Zheng commented on FLINK-29661: --- [~martijnvisser] Hi, this is not a bug as we already introduced a new rule in table planner to get partition table stats, I will close this jira, Thanks. > DatabaseCalciteSchema$getTable() cannot get statistics for partition table > -- > > Key: FLINK-29661 > URL: https://issues.apache.org/jira/browse/FLINK-29661 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Yunhong Zheng >Priority: Major > Fix For: 1.17.0 > > > DatabaseCalciteSchema$getTable() cannot get statistics for partition table. > DatabaseCalciteShema$extractTableStats() don't consider the situation that > the table is partition table, and it's stats need to be collected by > catalog.getPartitionStatistics() and catalog.getPartitionColumnStatistics() -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30131) flink iterate will suspend when record is a bit large
[ https://issues.apache.org/jira/browse/FLINK-30131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637525#comment-17637525 ] Lu commented on FLINK-30131: [~gaoyunhaii] ,I heard from [~Weijie Guo] that you are very professional in this field, can you give me some advice? Thanks a lot. > flink iterate will suspend when record is a bit large > - > > Key: FLINK-30131 > URL: https://issues.apache.org/jira/browse/FLINK-30131 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.15.2 >Reporter: Lu >Priority: Major > Attachments: image-2022-11-22-14-59-08-272.png > > > > {code:java} > //代码占位符 > Configuration configuration = new Configuration(); > configuration.setInteger(RestOptions.PORT, 8082); > configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 1000); > configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, > MemorySize.parse("4g")); > configuration.setInteger("taskmanager.network.memory.buffers-per-channel", > 1000); > configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate", > 1000); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > env.setParallelism(1); > List list = new ArrayList<>(10); > for (int i = 1; i < 1; i++) { > list.add(i); > } > DataStreamSource integerDataStreamSource = env.fromCollection(list); > DataStream map = integerDataStreamSource.map(i -> new > byte[1000]).setParallelism(1).name("map to byte[]").shuffle(); > IterativeStream iterate = map.iterate(); > DataStream map1 = iterate.process(new ProcessFunction byte[]>() { > @Override > public void processElement(byte[] value, ProcessFunction byte[]>.Context ctx, Collector out) throws Exception { > out.collect(value); > } > }).name("multi collect"); > DataStream filter = map1.filter(i -> true > ).setParallelism(1).name("feedback"); > iterate.closeWith(filter); > map1.map(bytes -> bytes.length).name("map to length").print(); > env.execute(); {code} > my code is above. > > when i use iterate with big record , the iterate will suspend at a random > place. when i saw the stack, it has a suspicious thread > !image-2022-11-22-14-59-08-272.png|width=751,height=328! > it seems like a network related problem. so i increse the network buffer > memory and num. but it only delay the suspend point, it will still suspend > after iterate a little more times than before. > i want to know if this is a bug or i have some error in my code or > configuration. > looking forward to your reply. thanks in advance. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30158) [Flink SQL][Protobuf] NullPointerException when querying Kafka topic using repeated or map attributes
[ https://issues.apache.org/jira/browse/FLINK-30158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637524#comment-17637524 ] luoyuxia commented on FLINK-30158: -- I have no much knowledge about protobuf format. But I had a quick overview about the related code. {code:java} FieldDescriptor elementFd = descriptor.findFieldByName(fieldName); String pbGetMessageElementCode = pbGetMessageElementCode( pbMessageVar, strongCamelFieldName, elementFd, PbFormatUtils.isArrayType(subType)); // NPE is thrown in calling this method {code} I think the reason is that the `elementFd` we got is null, so NPE happens. So, how do you set the option `message-class-name` for Pb format? Does it contains all fieldNames defined in Flink SQL. > [Flink SQL][Protobuf] NullPointerException when querying Kafka topic using > repeated or map attributes > - > > Key: FLINK-30158 > URL: https://issues.apache.org/jira/browse/FLINK-30158 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.16.0 >Reporter: James Mcguire >Priority: Major > > I am encountering a {{java.lang.NullPointerException}} exception when trying > to use Flink SQL to query a kafka topic that uses either {{repeated}} and/or > {{map}} attributes. > > {*}{*}{*}Replication{*} *steps* > # Use a protobuf definition that either uses repeated and/or map. This > protobuf schema should cover a few of the problematic scenarios I ran into: > > {code:java} > syntax = "proto3"; > package example.message; > option java_package = "com.example.message"; > option java_multiple_files = true; > message NestedType { > int64 nested_first = 1; > oneof nested_second { > int64 one_of_first = 2; > string one_of_second = 3; > } > } > message Test { > repeated int64 first = 1; > map second = 2; > } {code} > 2. Attempt query on topic, even excluding problematic columns: > > {code:java} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.formats.protobuf.PbCodegenException: > java.lang.NullPointerException{code} > > > log file: > > {code:java} > 2022-11-22 15:33:59,510 WARN org.apache.flink.table.client.cli.CliClient > [] - Could not execute SQL > statement.org.apache.flink.table.client.gateway.SqlExecutionException: Error > while retrieving result.at > org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79) > ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.lang.RuntimeException: > Failed to fetch next resultat > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) > ~[?:?]at > org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) > ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.io.IOException: Failed > to fetch job execution resultat > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) > ~[flink-dist-1.16.0.jar:1.16.0]at > org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) > ~[?:?]at > org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) > ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: > java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: bc869097009a92d0601add881a6b920c)at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > ~[?:?]at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) > ~[?:?]at >
[jira] [Commented] (FLINK-30131) flink iterate will suspend when record is a bit large
[ https://issues.apache.org/jira/browse/FLINK-30131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637523#comment-17637523 ] Lu commented on FLINK-30131: [~Weijie Guo] , Thanks for your reply. # yes, my job hanging forever # i run this job in local env. you can reproduce the problem by the above code without config file. # ok, got it. but it doesn't work if i only increase TM memory and network memory for this problem. i trace the source code for LocalBufferPool, and find the other tow config is related. But as said above, this does not solve the fundamental problem. > flink iterate will suspend when record is a bit large > - > > Key: FLINK-30131 > URL: https://issues.apache.org/jira/browse/FLINK-30131 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.15.2 >Reporter: Lu >Priority: Major > Attachments: image-2022-11-22-14-59-08-272.png > > > > {code:java} > //代码占位符 > Configuration configuration = new Configuration(); > configuration.setInteger(RestOptions.PORT, 8082); > configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 1000); > configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, > MemorySize.parse("4g")); > configuration.setInteger("taskmanager.network.memory.buffers-per-channel", > 1000); > configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate", > 1000); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > env.setParallelism(1); > List list = new ArrayList<>(10); > for (int i = 1; i < 1; i++) { > list.add(i); > } > DataStreamSource integerDataStreamSource = env.fromCollection(list); > DataStream map = integerDataStreamSource.map(i -> new > byte[1000]).setParallelism(1).name("map to byte[]").shuffle(); > IterativeStream iterate = map.iterate(); > DataStream map1 = iterate.process(new ProcessFunction byte[]>() { > @Override > public void processElement(byte[] value, ProcessFunction byte[]>.Context ctx, Collector out) throws Exception { > out.collect(value); > } > }).name("multi collect"); > DataStream filter = map1.filter(i -> true > ).setParallelism(1).name("feedback"); > iterate.closeWith(filter); > map1.map(bytes -> bytes.length).name("map to length").print(); > env.execute(); {code} > my code is above. > > when i use iterate with big record , the iterate will suspend at a random > place. when i saw the stack, it has a suspicious thread > !image-2022-11-22-14-59-08-272.png|width=751,height=328! > it seems like a network related problem. so i increse the network buffer > memory and num. but it only delay the suspend point, it will still suspend > after iterate a little more times than before. > i want to know if this is a bug or i have some error in my code or > configuration. > looking forward to your reply. thanks in advance. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #174: [FLINK-29604] Add Estimator and Transformer for CountVectorizer
yunfengzhou-hub commented on code in PR #174: URL: https://github.com/apache/flink-ml/pull/174#discussion_r1029954520 ## docs/content/docs/operators/feature/countvectorizer.md: ## @@ -0,0 +1,182 @@ +--- +title: "Count Vectorizer" +weight: 1 +type: docs +aliases: +- /operators/feature/countvectorizer.html +--- + + + +## Count Vectorizer + +CountVectorizer aims to help convert a collection of text documents to +vectors of token counts. When an a-priori dictionary is not available, +CountVectorizer can be used as an estimator to extract the vocabulary, +and generates a CountVectorizerModel. The model produces sparse +representations for the documents over the vocabulary, which can then +be passed to other algorithms like LDA. + +### Input Columns + +| Param name | Type | Default | Description | +|:---|:-|:--|:| +| inputCol | String[] | `"input"` | Input string array. | + +### Output Columns + +| Param name | Type | Default| Description | +|:---|:-|:---|:| +| outputCol | SparseVector | `"output"` | Vector of token counts. | + +### Parameters + +Below are the parameters required by `CountVectorizerModel`. + +| Key| Default| Type| Required | Description | +|||-|--|-| +| inputCol | `"input"` | String | no | Input column name. | +| outputCol | `"output"` | String | no | Output column name. | +| minTF | `1.0` | Double | no | Filter to ignore rare words in a document. For each document, terms with frequency/count less than the given threshold are ignored. If this is an integer >= 1, then this specifies a count (of times the term must appear in the document); if this is a double in [0,1), then this specifies a fraction (out of the document's token count). | Review Comment: According to offline discussion, we'll allow users to set the parameter to any double value larger than 1, so as to keep aligned with the function of the count vectorizer in Spark. However, it is still not recommended for users to that this parameter to such values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30030) Unexpected behavior for overwrite in Hive dialect
[ https://issues.apache.org/jira/browse/FLINK-30030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia updated FLINK-30030: - Description: When overwrite a table in hive dialect, it might not overwrite but to append. It can reproduce using the following sql: {code:java} insert overwrite table `db.tab` values (1, 2) {code} only happen when the table is encapsulated by backquote and the table contain two splits with ".". Just find the behavior in hive is also incorrect as reported in HIVE-26741. Let'us keep to see what's the expected behavior. was: When overwrite a table in hive dialect, it might not overwrite but to append. It can reproduce using the following sql: {code:java} insert into table `db.tab` values (1, 2) {code} only happen when the table is encapsulated by backquote and the table contain two splits with ".". Just find the behavior in hive is also incorrect as reported in HIVE-26741. Let'us keep to see what's the expected behavior. > Unexpected behavior for overwrite in Hive dialect > - > > Key: FLINK-30030 > URL: https://issues.apache.org/jira/browse/FLINK-30030 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.15.0 >Reporter: luoyuxia >Priority: Major > Labels: pull-request-available > > When overwrite a table in hive dialect, it might not overwrite but to append. > It can reproduce using the following sql: > {code:java} > insert overwrite table `db.tab` values (1, 2) {code} > only happen when the table is encapsulated by backquote and the table contain > two splits with ".". > > Just find the behavior in hive is also incorrect as reported in HIVE-26741. > Let'us keep to see what's the expected behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30158) [Flink SQL][Protobuf] NullPointerException when querying Kafka topic using repeated or map attributes
James Mcguire created FLINK-30158: - Summary: [Flink SQL][Protobuf] NullPointerException when querying Kafka topic using repeated or map attributes Key: FLINK-30158 URL: https://issues.apache.org/jira/browse/FLINK-30158 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Ecosystem Affects Versions: 1.16.0 Reporter: James Mcguire I am encountering a {{java.lang.NullPointerException}} exception when trying to use Flink SQL to query a kafka topic that uses either {{repeated}} and/or {{map}} attributes. {*}{*}{*}Replication{*} *steps* # Use a protobuf definition that either uses repeated and/or map. This protobuf schema should cover a few of the problematic scenarios I ran into: {code:java} syntax = "proto3"; package example.message; option java_package = "com.example.message"; option java_multiple_files = true; message NestedType { int64 nested_first = 1; oneof nested_second { int64 one_of_first = 2; string one_of_second = 3; } } message Test { repeated int64 first = 1; map second = 2; } {code} 2. Attempt query on topic, even excluding problematic columns: {code:java} [ERROR] Could not execute SQL statement. Reason: org.apache.flink.formats.protobuf.PbCodegenException: java.lang.NullPointerException{code} log file: {code:java} 2022-11-22 15:33:59,510 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement.org.apache.flink.table.client.gateway.SqlExecutionException: Error while retrieving result.at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79) ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.lang.RuntimeException: Failed to fetch next resultat org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) ~[flink-dist-1.16.0.jar:1.16.0]at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0]at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?]at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.io.IOException: Failed to fetch job execution resultat org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) ~[flink-dist-1.16.0.jar:1.16.0]at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.0.jar:1.16.0]at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.0.jar:1.16.0]at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0]at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?]at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: bc869097009a92d0601add881a6b920c)at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) ~[?:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) ~[flink-dist-1.16.0.jar:1.16.0]at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.0.jar:1.16.0]at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.0.jar:1.16.0]at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0]at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?]at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: bc869097009a92d0601add881a6b920c)at
[GitHub] [flink-connector-aws] hlteoh37 opened a new pull request, #27: [FLINK-29900][Connectors/DynamoDB] Make configurations for DynamoDB T…
hlteoh37 opened a new pull request, #27: URL: https://github.com/apache/flink-connector-aws/pull/27 …able API sink follow the standard AWS and HTTP client convention ## What is the purpose of the change Update DynamoDB Table API table configurations to follow the same convention used by Kinesis connector and Firehose connector for AWS and HTTP client configs. In particular: - Table API's `aws.credentials.*` maps to `aws.credentials.provider.*` - Table API's `sink.http-client.max-concurrency` maps to `aws.http-client.max-concurrency` - Table API's `sink.http-client.read-timeout` maps to `aws.http-client.read-timeout` - Table API's `sink.http-client.protocol.version` maps to `aws.http.protocol.version` ## Verifying this change - Unit tests - Manual tests - Built locally and verified that the configurations are passed into the `DynamoDbSink` ## Does this pull request potentially affect one of the following parts: * Dependencies (does it add or upgrade a dependency): no * The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no * The serializers: no * The runtime per-record code paths (performance sensitive): no * Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no * The S3 file system connector: no ## Documentation * Does this pull request introduce a new feature? no * If yes, how is the feature documented? n/a -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-30157) Trigger Events Before JM Recovery and Unhealthy Job Restarts
Matyas Orhidi created FLINK-30157: - Summary: Trigger Events Before JM Recovery and Unhealthy Job Restarts Key: FLINK-30157 URL: https://issues.apache.org/jira/browse/FLINK-30157 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.3.0 Reporter: Matyas Orhidi Fix For: kubernetes-operator-1.3.0 We should emit specific events for the following cases: * JM recovery * Unhealthy Job Restarts -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #26: [FLINK-25859][Connectors/DynamoDB][docs] DynamoDB sink documentation
hlteoh37 commented on code in PR #26: URL: https://github.com/apache/flink-connector-aws/pull/26#discussion_r1029864477 ## docs/content.zh/docs/connectors/datastream/dynamodb.md: ## @@ -0,0 +1,170 @@ +--- +title: DynamoDB +weight: 5 +type: docs +--- + + +# Amazon DynamoDB Sink + +The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). + +Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) +to setup a table. + +To use the connector, add the following Maven dependency to your project: + +{{< artifact flink-connector-dynamodb >}} Review Comment: I meant more the connector - flink mapping! e.g. `1.0.0_1.15` or `1.1.0_1.15`. How will we know whether to include `1.1.0` or `1.0.0`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #26: [FLINK-25859][Connectors/DynamoDB][docs] DataStream sink documentation
dannycranmer commented on code in PR #26: URL: https://github.com/apache/flink-connector-aws/pull/26#discussion_r1029850320 ## docs/content.zh/docs/connectors/datastream/dynamodb.md: ## @@ -0,0 +1,170 @@ +--- +title: DynamoDB +weight: 5 +type: docs +--- + + +# Amazon DynamoDB Sink + +The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). + +Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) +to setup a table. + +To use the connector, add the following Maven dependency to your project: + +{{< artifact flink-connector-dynamodb >}} + +{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}} +{{< tab "Java" >}} +```java +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); +// Optional, provide via alternative routes e.g. environment variables +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +ElementConverter elementConverter = new CustomElementConverter(); + +DynamoDbSink dynamoDbSink = +DynamoDbSink.builder() +.setDynamoDbProperties(sinkProperties) // Required +.setTableName("my-dynamodb-table") // Required +.setElementConverter(elementConverter) // Required +.setOverwriteByPartitionKeys(singletonList("key")) // Optional +.setFailOnError(false) // Optional +.setMaxBatchSize(25)// Optional +.setMaxInFlightRequests(50) // Optional +.setMaxBufferedRequests(10_000) // Optional +.setMaxTimeInBufferMS(5000) // Optional +.build(); + +flinkStream.sinkTo(dynamoDbSink); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +Properties sinkProperties = new Properties() +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1") +// Optional, provide via alternative routes e.g. environment variables +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") + +val elementConverter = new CustomElementConverter(); + +val dynamoDbSink = + DynamoDbSink.builder() + .setDynamoDbProperties(sinkProperties) // Required + .setTableName("my-dynamodb-table") // Required + .setElementConverter(elementConverter) // Required + .setOverwriteByPartitionKeys(singletonList("key")) // Optional + .setFailOnError(false) // Optional + .setMaxBatchSize(25)// Optional + .setMaxInFlightRequests(50) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .build() + +flinkStream.sinkTo(dynamoDbSink) +``` +{{< /tab >}} +{{< /tabs >}} + +## Configurations + +Flink's DynamoDB sink is created by using the static builder `DynamoDBSink.builder()`. + +1. __setDynamoDbProperties(Properties sinkProperties)__ + * Required. + * Supplies credentials, region and other parameters to the DynamoDB client. +2. __setTableName(String tableName)__ + * Required. + * Name of the table to sink to. +3. __setElementConverter(ElementConverter elementConverter)__ + * Required. + * Converts generic records of type `InputType` to `DynamoDbWriteRequest`. +4. __setOverwriteByPartitionKeys(List partitionKeys)__ + * Optional. + * Used to deduplicate write requests within each batch pushed to DynamoDB. +5. _setFailOnError(boolean failOnError)_ + * Optional. Default: `false`. + * Whether failed requests to write records are treated as fatal exceptions in the sink. +6. _setMaxBatchSize(int maxBatchSize)_ + * Optional. Default: `500`. + * Maximum size of a batch to write. +7. _setMaxInFlightRequests(int maxInFlightRequests)_ + * Optional. Default: `50`. + * The maximum number of in flight requests allowed before the sink applies backpressure. +8. _setMaxBufferedRequests(int maxBufferedRequests)_ + * Optional. Default: `10_000`. + * The maximum number of records that may be buffered in the sink before backpressure is applied. +9. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_ + * N/A. + * This configuration is not supported, see [FLINK-29854](https://issues.apache.org/jira/browse/FLINK-29854). +10. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_ +*
[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #26: [FLINK-25859][Connectors/DynamoDB][docs] DataStream sink documentation
dannycranmer commented on code in PR #26: URL: https://github.com/apache/flink-connector-aws/pull/26#discussion_r1029848582 ## docs/content.zh/docs/connectors/datastream/dynamodb.md: ## @@ -0,0 +1,170 @@ +--- +title: DynamoDB +weight: 5 +type: docs +--- + + +# Amazon DynamoDB Sink + +The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). + +Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) +to setup a table. + +To use the connector, add the following Maven dependency to your project: + +{{< artifact flink-connector-dynamodb >}} + +{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}} +{{< tab "Java" >}} +```java +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); +// Optional, provide via alternative routes e.g. environment variables +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +ElementConverter elementConverter = new CustomElementConverter(); + +DynamoDbSink dynamoDbSink = +DynamoDbSink.builder() +.setDynamoDbProperties(sinkProperties) // Required +.setTableName("my-dynamodb-table") // Required +.setElementConverter(elementConverter) // Required +.setOverwriteByPartitionKeys(singletonList("key")) // Optional +.setFailOnError(false) // Optional +.setMaxBatchSize(25)// Optional +.setMaxInFlightRequests(50) // Optional +.setMaxBufferedRequests(10_000) // Optional +.setMaxTimeInBufferMS(5000) // Optional +.build(); + +flinkStream.sinkTo(dynamoDbSink); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +Properties sinkProperties = new Properties() +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1") +// Optional, provide via alternative routes e.g. environment variables +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") + +val elementConverter = new CustomElementConverter(); + +val dynamoDbSink = + DynamoDbSink.builder() + .setDynamoDbProperties(sinkProperties) // Required + .setTableName("my-dynamodb-table") // Required + .setElementConverter(elementConverter) // Required + .setOverwriteByPartitionKeys(singletonList("key")) // Optional + .setFailOnError(false) // Optional + .setMaxBatchSize(25)// Optional + .setMaxInFlightRequests(50) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .build() + +flinkStream.sinkTo(dynamoDbSink) +``` +{{< /tab >}} +{{< /tabs >}} + +## Configurations + +Flink's DynamoDB sink is created by using the static builder `DynamoDBSink.builder()`. + +1. __setDynamoDbProperties(Properties sinkProperties)__ + * Required. + * Supplies credentials, region and other parameters to the DynamoDB client. +2. __setTableName(String tableName)__ + * Required. + * Name of the table to sink to. +3. __setElementConverter(ElementConverter elementConverter)__ + * Required. + * Converts generic records of type `InputType` to `DynamoDbWriteRequest`. +4. __setOverwriteByPartitionKeys(List partitionKeys)__ Review Comment: Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #26: [FLINK-25859][Connectors/DynamoDB][docs] DataStream sink documentation
dannycranmer commented on code in PR #26: URL: https://github.com/apache/flink-connector-aws/pull/26#discussion_r1029845175 ## docs/content.zh/docs/connectors/datastream/dynamodb.md: ## @@ -0,0 +1,170 @@ +--- +title: DynamoDB +weight: 5 +type: docs +--- + + +# Amazon DynamoDB Sink + +The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). + +Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) +to setup a table. + +To use the connector, add the following Maven dependency to your project: + +{{< artifact flink-connector-dynamodb >}} Review Comment: These docs are included into the Flink docs, so it should resolve the variables ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #26: [FLINK-25859][Connectors/DynamoDB][docs] DataStream sink documentation
dannycranmer commented on code in PR #26: URL: https://github.com/apache/flink-connector-aws/pull/26#discussion_r1029842814 ## docs/content.zh/docs/connectors/datastream/dynamodb.md: ## @@ -0,0 +1,170 @@ +--- +title: DynamoDB +weight: 5 +type: docs Review Comment: Not sure, copied firehose and did not have one. I will add. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-29475) Add WARNING/ERROR checker for the operator in e2e tests
[ https://issues.apache.org/jira/browse/FLINK-29475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-29475. -- Fix Version/s: kubernetes-operator-1.3.0 Resolution: Fixed merged to main af00c99defbe49c84dbd8a3ac4341136ca3efac9 > Add WARNING/ERROR checker for the operator in e2e tests > --- > > Key: FLINK-29475 > URL: https://issues.apache.org/jira/browse/FLINK-29475 > Project: Flink > Issue Type: Improvement >Affects Versions: kubernetes-operator-1.3.0 >Reporter: Matyas Orhidi >Assignee: Gabor Somogyi >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.3.0 > > > We can also try eliminating unwanted warnings like: > {{[WARN ] The client is using resource type 'flinkdeployments' with unstable > version 'v1beta1'}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #446: [FLINK-29475] Add error checker for the operator in e2e tests
gyfora merged PR #446: URL: https://github.com/apache/flink-kubernetes-operator/pull/446 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #26: [FLINK-25859][Connectors/DynamoDB][docs] DataStream sink documentation
hlteoh37 commented on code in PR #26: URL: https://github.com/apache/flink-connector-aws/pull/26#discussion_r1029786873 ## docs/content.zh/docs/connectors/datastream/dynamodb.md: ## @@ -0,0 +1,170 @@ +--- +title: DynamoDB +weight: 5 +type: docs +--- + + +# Amazon DynamoDB Sink + +The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). + +Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) +to setup a table. + +To use the connector, add the following Maven dependency to your project: + +{{< artifact flink-connector-dynamodb >}} + +{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}} +{{< tab "Java" >}} +```java +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); +// Optional, provide via alternative routes e.g. environment variables +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +ElementConverter elementConverter = new CustomElementConverter(); + +DynamoDbSink dynamoDbSink = +DynamoDbSink.builder() +.setDynamoDbProperties(sinkProperties) // Required +.setTableName("my-dynamodb-table") // Required +.setElementConverter(elementConverter) // Required +.setOverwriteByPartitionKeys(singletonList("key")) // Optional +.setFailOnError(false) // Optional +.setMaxBatchSize(25)// Optional +.setMaxInFlightRequests(50) // Optional +.setMaxBufferedRequests(10_000) // Optional +.setMaxTimeInBufferMS(5000) // Optional +.build(); + +flinkStream.sinkTo(dynamoDbSink); Review Comment: nit: spacing ## docs/content.zh/docs/connectors/datastream/dynamodb.md: ## @@ -0,0 +1,170 @@ +--- +title: DynamoDB +weight: 5 +type: docs +--- + + +# Amazon DynamoDB Sink + +The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). + +Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) +to setup a table. + +To use the connector, add the following Maven dependency to your project: + +{{< artifact flink-connector-dynamodb >}} + +{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}} +{{< tab "Java" >}} +```java +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); +// Optional, provide via alternative routes e.g. environment variables +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +ElementConverter elementConverter = new CustomElementConverter(); + +DynamoDbSink dynamoDbSink = +DynamoDbSink.builder() +.setDynamoDbProperties(sinkProperties) // Required +.setTableName("my-dynamodb-table") // Required +.setElementConverter(elementConverter) // Required +.setOverwriteByPartitionKeys(singletonList("key")) // Optional +.setFailOnError(false) // Optional +.setMaxBatchSize(25)// Optional +.setMaxInFlightRequests(50) // Optional +.setMaxBufferedRequests(10_000) // Optional +.setMaxTimeInBufferMS(5000) // Optional +.build(); + +flinkStream.sinkTo(dynamoDbSink); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +Properties sinkProperties = new Properties() +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1") +// Optional, provide via alternative routes e.g. environment variables +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") + +val elementConverter = new CustomElementConverter(); + +val dynamoDbSink = + DynamoDbSink.builder() + .setDynamoDbProperties(sinkProperties) // Required + .setTableName("my-dynamodb-table") // Required + .setElementConverter(elementConverter) // Required +
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors
reta commented on code in PR #1: URL: https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029752612 ## flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/table/IndexGeneratorTest.java: ## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.table; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.temporal.UnsupportedTemporalTypeException; +import java.util.Arrays; +import java.util.List; + +/** Suite tests for {@link IndexGenerator}. */ +class IndexGeneratorTest { + +private static final List fieldNames = +Arrays.asList( +"id", +"item", +"log_ts", +"log_date", +"order_timestamp", +"log_time", +"local_datetime", +"local_date", +"local_time", +"note", +"status"); + +private static final List dataTypes = +Arrays.asList( +DataTypes.INT(), +DataTypes.STRING(), +DataTypes.BIGINT(), +DataTypes.DATE().bridgedTo(Date.class), +DataTypes.TIMESTAMP().bridgedTo(Timestamp.class), +DataTypes.TIME().bridgedTo(Time.class), +DataTypes.TIMESTAMP().bridgedTo(LocalDateTime.class), +DataTypes.DATE().bridgedTo(LocalDate.class), +DataTypes.TIME().bridgedTo(LocalTime.class), +DataTypes.STRING(), +DataTypes.BOOLEAN()); + +private static final List rows = +Arrays.asList( +GenericRowData.of( +1, +StringData.fromString("apple"), +Timestamp.valueOf("2020-03-18 12:12:14").getTime(), +(int) Date.valueOf("2020-03-18").toLocalDate().toEpochDay(), + TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-18 12:12:14")), +(int) + (Time.valueOf("12:12:14").toLocalTime().toNanoOfDay() +/ 1_000_000L), +TimestampData.fromLocalDateTime( +LocalDateTime.of(2020, 3, 18, 12, 12, 14, 1000)), +(int) LocalDate.of(2020, 3, 18).toEpochDay(), +(int) (LocalTime.of(12, 13, 14, 2000).toNanoOfDay() / 1_000_000L), +"test1", +true), +GenericRowData.of( +2, +StringData.fromString("peanut"), +Timestamp.valueOf("2020-03-19 12:22:14").getTime(), +(int) Date.valueOf("2020-03-19").toLocalDate().toEpochDay(), + TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-19 12:22:21")), +(int) + (Time.valueOf("12:22:21").toLocalTime().toNanoOfDay() +/ 1_000_000L), +TimestampData.fromLocalDateTime( +LocalDateTime.of(2020, 3, 19, 12, 22, 14, 1000)), +(int) LocalDate.of(2020, 3, 19).toEpochDay(), +(int) (LocalTime.of(12, 13, 14,
[GitHub] [flink-connector-opensearch] reta commented on pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors
reta commented on PR #1: URL: https://github.com/apache/flink-connector-opensearch/pull/1#issuecomment-1324052597 > The architecture tests are missing. Have a look at the ES connector. Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30156) [FLIP-242] Blogpost about the customisable RateLimitingStrategy in the AsyncSinkBase
[ https://issues.apache.org/jira/browse/FLINK-30156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30156: --- Labels: pull-request-available (was: ) > [FLIP-242] Blogpost about the customisable RateLimitingStrategy in the > AsyncSinkBase > > > Key: FLINK-30156 > URL: https://issues.apache.org/jira/browse/FLINK-30156 > Project: Flink > Issue Type: Improvement >Reporter: Hong Liang Teoh >Priority: Major > Labels: pull-request-available > > Create a blogpost to explain the customisability of the RateLimitingStrategy > in the AsyncSinkBase. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-web] hlteoh37 opened a new pull request, #584: [FLINK-30156] Add blogpost for the Async sink custom RateLimitingStra…
hlteoh37 opened a new pull request, #584: URL: https://github.com/apache/flink-web/pull/584 …tegy Documentation for the custom RateLimitingStrategy for the Async Sink [FLINK-28487](https://issues.apache.org/jira/browse/FLINK-28487). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-30156) [FLIP-242] Blogpost about the customisable RateLimitingStrategy in the AsyncSinkBase
Hong Liang Teoh created FLINK-30156: --- Summary: [FLIP-242] Blogpost about the customisable RateLimitingStrategy in the AsyncSinkBase Key: FLINK-30156 URL: https://issues.apache.org/jira/browse/FLINK-30156 Project: Flink Issue Type: Improvement Reporter: Hong Liang Teoh Create a blogpost to explain the customisability of the RateLimitingStrategy in the AsyncSinkBase. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29938) Add open() Method to AsyncSink ElementConverter
[ https://issues.apache.org/jira/browse/FLINK-29938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-29938: -- Fix Version/s: 1.17.0 > Add open() Method to AsyncSink ElementConverter > --- > > Key: FLINK-29938 > URL: https://issues.apache.org/jira/browse/FLINK-29938 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > The {{ElementConverter}} is used to convert records to objects that a Sink > can deliver to a destination. In some sink implementations, the > {{ElementConverter}} needs to be serialized and sent to TM, DynamoDB is a > good example [1]. For DynamoDB we need to lazily instantiate some objects, > and an `open()` method would provide a clean hook for this. > [1] > https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverter.java#L57 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-29938) Add open() Method to AsyncSink ElementConverter
[ https://issues.apache.org/jira/browse/FLINK-29938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer resolved FLINK-29938. --- Resolution: Done > Add open() Method to AsyncSink ElementConverter > --- > > Key: FLINK-29938 > URL: https://issues.apache.org/jira/browse/FLINK-29938 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > The {{ElementConverter}} is used to convert records to objects that a Sink > can deliver to a destination. In some sink implementations, the > {{ElementConverter}} needs to be serialized and sent to TM, DynamoDB is a > good example [1]. For DynamoDB we need to lazily instantiate some objects, > and an `open()` method would provide a clean hook for this. > [1] > https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverter.java#L57 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29938) Add open() Method to AsyncSink ElementConverter
[ https://issues.apache.org/jira/browse/FLINK-29938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637367#comment-17637367 ] Danny Cranmer commented on FLINK-29938: --- Merged commit [{{91c4d86}}|https://github.com/apache/flink/commit/91c4d865eabad0f7f1b8c7426d87e86afa06d6f6] into apache:master > Add open() Method to AsyncSink ElementConverter > --- > > Key: FLINK-29938 > URL: https://issues.apache.org/jira/browse/FLINK-29938 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > The {{ElementConverter}} is used to convert records to objects that a Sink > can deliver to a destination. In some sink implementations, the > {{ElementConverter}} needs to be serialized and sent to TM, DynamoDB is a > good example [1]. For DynamoDB we need to lazily instantiate some objects, > and an `open()` method would provide a clean hook for this. > [1] > https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverter.java#L57 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] dannycranmer merged pull request #21265: [FLINK-29938][Connectors/Base] Add open() Method to AsyncSink ElementConverter
dannycranmer merged PR #21265: URL: https://github.com/apache/flink/pull/21265 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21366: [FLINK-30155][streaming] Pretty print MutatedConfigurationException
flinkbot commented on PR #21366: URL: https://github.com/apache/flink/pull/21366#issuecomment-1323993083 ## CI report: * 6d29e95a6af0f8848858df622efcc885f98650ad UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-30155) Pretty print MutatedConfigurationException
[ https://issues.apache.org/jira/browse/FLINK-30155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30155: --- Labels: pull-request-available (was: ) > Pretty print MutatedConfigurationException > -- > > Key: FLINK-30155 > URL: https://issues.apache.org/jira/browse/FLINK-30155 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.17.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > Currently MutatedConfigurationException is printed as: > {noformat} > org.apache.flink.client.program.MutatedConfigurationException: > Configuration execution.sorted-inputs.enabled:true not allowed. > Configuration execution.runtime-mode was changed from STREAMING to BATCH. > Configuration execution.checkpointing.interval:500 ms not allowed in the > configuration object CheckpointConfig. > Configuration execution.checkpointing.mode:EXACTLY_ONCE not allowed in > the configuration object CheckpointConfig. > Configuration pipeline.max-parallelism:1024 not allowed in the > configuration object ExecutionConfig. > Configuration parallelism.default:25 not allowed in the configuration > object ExecutionConfig. > at > org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2027) > {noformat} > Which is slightly confusing. First not allowed configuration is listed in the > same line as the exception name, which (especially if wrapped) can make it > more difficult than necessary for user to understand that this is a list of > violations. I'm proposing to change it to: > {noformat} > org.apache.flink.client.program.MutatedConfigurationException: Not > allowed configuration change(s) were detected: > - Configuration execution.sorted-inputs.enabled:true not allowed. > - Configuration execution.runtime-mode was changed from STREAMING to > BATCH. > - Configuration execution.checkpointing.interval:500 ms not allowed in > the configuration object CheckpointConfig. > - Configuration execution.checkpointing.mode:EXACTLY_ONCE not allowed in > the configuration object CheckpointConfig. > - Configuration pipeline.max-parallelism:1024 not allowed in the > configuration object ExecutionConfig. > - Configuration parallelism.default:25 not allowed in the configuration > object ExecutionConfig. > at > org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2027) > {noformat} > To make it more clear. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] pnowojski opened a new pull request, #21366: [FLINK-30155][streaming] Pretty print MutatedConfigurationException
pnowojski opened a new pull request, #21366: URL: https://github.com/apache/flink/pull/21366 The new message will look like: org.apache.flink.client.program.MutatedConfigurationException: Not allowed configuration change(s) were detected: - Configuration execution.sorted-inputs.enabled:true not allowed. - Configuration execution.runtime-mode was changed from STREAMING to BATCH. - Configuration execution.checkpointing.interval:500 ms not allowed in the configuration object CheckpointConfig. - Configuration execution.checkpointing.mode:EXACTLY_ONCE not allowed in the configuration object CheckpointConfig. - Configuration pipeline.max-parallelism:1024 not allowed in the configuration object ExecutionConfig. - Configuration parallelism.default:25 not allowed in the configuration object ExecutionConfig. at org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2027) Instead of: org.apache.flink.client.program.MutatedConfigurationException: Configuration execution.sorted-inputs.enabled:true not allowed. Configuration execution.runtime-mode was changed from STREAMING to BATCH. Configuration execution.checkpointing.interval:500 ms not allowed in the configuration object CheckpointConfig. Configuration execution.checkpointing.mode:EXACTLY_ONCE not allowed in the configuration object CheckpointConfig. Configuration pipeline.max-parallelism:1024 not allowed in the configuration object ExecutionConfig. Configuration parallelism.default:25 not allowed in the configuration object ExecutionConfig. at org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2027) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-30155) Pretty print MutatedConfigurationException
Piotr Nowojski created FLINK-30155: -- Summary: Pretty print MutatedConfigurationException Key: FLINK-30155 URL: https://issues.apache.org/jira/browse/FLINK-30155 Project: Flink Issue Type: New Feature Components: API / DataStream Affects Versions: 1.17.0 Reporter: Piotr Nowojski Assignee: Piotr Nowojski Fix For: 1.17.0 Currently MutatedConfigurationException is printed as: {noformat} org.apache.flink.client.program.MutatedConfigurationException: Configuration execution.sorted-inputs.enabled:true not allowed. Configuration execution.runtime-mode was changed from STREAMING to BATCH. Configuration execution.checkpointing.interval:500 ms not allowed in the configuration object CheckpointConfig. Configuration execution.checkpointing.mode:EXACTLY_ONCE not allowed in the configuration object CheckpointConfig. Configuration pipeline.max-parallelism:1024 not allowed in the configuration object ExecutionConfig. Configuration parallelism.default:25 not allowed in the configuration object ExecutionConfig. at org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2027) {noformat} Which is slightly confusing. First not allowed configuration is listed in the same line as the exception name, which (especially if wrapped) can make it more difficult than necessary for user to understand that this is a list of violations. I'm proposing to change it to: {noformat} org.apache.flink.client.program.MutatedConfigurationException: Not allowed configuration change(s) were detected: - Configuration execution.sorted-inputs.enabled:true not allowed. - Configuration execution.runtime-mode was changed from STREAMING to BATCH. - Configuration execution.checkpointing.interval:500 ms not allowed in the configuration object CheckpointConfig. - Configuration execution.checkpointing.mode:EXACTLY_ONCE not allowed in the configuration object CheckpointConfig. - Configuration pipeline.max-parallelism:1024 not allowed in the configuration object ExecutionConfig. - Configuration parallelism.default:25 not allowed in the configuration object ExecutionConfig. at org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2027) {noformat} To make it more clear. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
snuyanzin commented on PR #20991: URL: https://github.com/apache/flink/pull/20991#issuecomment-1323964883 or just cherry-picked commits from this PR to rebased version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on pull request #21264: [FLINK-29928][runtime, state] Share RocksDB memory across TM slots
rkhachatryan commented on PR #21264: URL: https://github.com/apache/flink/pull/21264#issuecomment-1323953503 Rebased and resolved the conflict. @AlanConfluent do you have any further feedback? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
ashmeet-kandhari commented on PR #20991: URL: https://github.com/apache/flink/pull/20991#issuecomment-1323938824 Doesn't show any diff, I tried checking the history as well and shows nothing ![image](https://user-images.githubusercontent.com/37675804/203368442-7e42d3cb-8e2c-4a49-a195-aec35b66da7c.png) Maybe I will need to cancel merge and try again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #16532: [FLINK-13400]Remove Hive and Hadoop dependencies from SQL Client
snuyanzin commented on PR #16532: URL: https://github.com/apache/flink/pull/16532#issuecomment-1323929870 @yiduwangkai @lirui-apache @fsk119 i'd like to revive it a bit do you mind if I continue work here based on what was done here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
snuyanzin commented on PR #20991: URL: https://github.com/apache/flink/pull/20991#issuecomment-1323924577 what are the changes there? from my point of view they should not be changed and should not be a part of this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
ashmeet-kandhari commented on PR #20991: URL: https://github.com/apache/flink/pull/20991#issuecomment-1323920168 Oh not sure why are they here, but this is what I found from the rat.txt logs ``` 14 Unknown Licenses * Files with unapproved licenses: flink-connectors/flink-connector-files/src/test/resources/bucket-state-migration-test/empty-v1/snapshot flink-connectors/flink-connector-files/src/test/resources/bucket-state-migration-test/empty-v2/snapshot flink-connectors/flink-connector-files/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot flink-connectors/flink-connector-files/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot flink-connectors/flink-connector-files/src/test/resources/committable-serializer-migration/in-progress-v1/committable flink-connectors/flink-connector-files/src/test/resources/committable-serializer-migration/pending-v1/committable flink-connectors/flink-file-sink-common/src/test/resources/recoverable-serializer-migration/in-progress-v1/recoverable flink-connectors/flink-file-sink-common/src/test/resources/recoverable-serializer-migration/pending-v1/recoverable flink-core/src/test/resources/abstractID-with-toString-field flink-core/src/test/resources/abstractID-with-toString-field-set flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v1/snapshot flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v2/snapshot flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
snuyanzin commented on PR #20991: URL: https://github.com/apache/flink/pull/20991#issuecomment-1323917212 it means that there are some new files in your PR without license... I guess there should be no any new files within this PR. A list of new files you can find in already mentioned `D:\Projects\Intellij\git\flink\target\rat.txt ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
ashmeet-kandhari commented on PR #20991: URL: https://github.com/apache/flink/pull/20991#issuecomment-1323914415 Hi @snuyanzin I tried resolving the conflicts (haven't pushed yet), but when I try to build locally getting the following error ``` Too many files with unapproved license: 14 See RAT report in: D:\Projects\ Intellij\git\flink\target\rat.txt ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors
reta commented on code in PR #1: URL: https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029500963 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchDynamicSinkFactory.java: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.StringUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.ALLOW_INSECURE; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_TIMEOUT; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.DELIVERY_GUARANTEE_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.FORMAT_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.HOSTS_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.INDEX_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.KEY_DELIMITER_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.SOCKET_TIMEOUT; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.USERNAME_OPTION; +import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; +import static org.opensearch.common.Strings.capitalize; + +/** A {@link DynamicTableSinkFactory} for discovering OpensearchDynamicSink. */ +@Internal +public class OpensearchDynamicSinkFactory implements DynamicTableSinkFactory { +private static final String FACTORY_IDENTIFIER = "opensearch"; + +private final OpensearchSinkBuilderSupplier sinkBuilderSupplier; + +public OpensearchDynamicSinkFactory() { +this.sinkBuilderSupplier =
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors
reta commented on code in PR #1: URL: https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029481239 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java: ## @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.http.HttpHost; +import org.opensearch.action.ActionListener; +import org.opensearch.action.bulk.BackoffPolicy; +import org.opensearch.action.bulk.BulkProcessor; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.unit.TimeValue; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Builder to construct an Opensearch compatible {@link OpensearchSink}. + * + * The following example shows the minimal setup to create a OpensearchSink that submits actions + * on checkpoint or the default number of actions was buffered (1000). + * + * {@code + * OpensearchSink sink = new OpensearchSinkBuilder() + * .setHosts(new HttpHost("localhost:9200") + * .setEmitter((element, context, indexer) -> { + * indexer.add( + * new IndexRequest("my-index") + * .id(element.f0.toString()) + * .source(element.f1) + * ); + * }) + * .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + * .build(); + * } + * + * @param type of the records converted to Opensearch actions + */ +@PublicEvolving +public class OpensearchSinkBuilder { + +private int bulkFlushMaxActions = 1000; +private int bulkFlushMaxMb = -1; +private long bulkFlushInterval = -1; +private FlushBackoffType bulkFlushBackoffType = FlushBackoffType.NONE; +private int bulkFlushBackoffRetries = -1; +private long bulkFlushBackOffDelay = -1; +private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE; +private List hosts; +protected OpensearchEmitter emitter; +private String username; +private String password; +private String connectionPathPrefix; +private Integer connectionTimeout; +private Integer connectionRequestTimeout; +private Integer socketTimeout; +private Boolean allowInsecure; + +public OpensearchSinkBuilder() {} + +@SuppressWarnings("unchecked") +protected > S self() { +return (S) this; +} + +/** + * Sets the emitter which is invoked on every record to convert it to Opensearch actions. + * + * @param emitter to process records into Opensearch actions. + * @return this builder + */ +public OpensearchSinkBuilder setEmitter( +OpensearchEmitter emitter) { +checkNotNull(emitter); +checkState( +InstantiationUtil.isSerializable(emitter), +"The Opensearch emitter must be serializable."); + +final OpensearchSinkBuilder self = self(); +self.emitter = emitter; +return self; +} + +/** + * Sets the hosts where the Opensearch cluster nodes are reachable. + * + * @param hosts http addresses describing the node locations + * @return this builder + */ +public OpensearchSinkBuilder setHosts(HttpHost... hosts) { +checkNotNull(hosts); +checkState(hosts.length > 0, "Hosts cannot be empty."); +this.hosts = Arrays.asList(hosts); +return self(); +} + +/** + * Sets the wanted {@link DeliveryGuarantee}. The default delivery guarantee is
[jira] [Resolved] (FLINK-30145) Evaluate operator error log whitelist entry: JmDeploymentStatus.ERROR.Count
[ https://issues.apache.org/jira/browse/FLINK-30145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi resolved FLINK-30145. --- Resolution: Won't Fix No action needed since this is just a metric > Evaluate operator error log whitelist entry: JmDeploymentStatus.ERROR.Count > --- > > Key: FLINK-30145 > URL: https://issues.apache.org/jira/browse/FLINK-30145 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Gabor Somogyi >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-30145) Evaluate operator error log whitelist entry: JmDeploymentStatus.ERROR.Count
[ https://issues.apache.org/jira/browse/FLINK-30145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi closed FLINK-30145. - Assignee: Gabor Somogyi > Evaluate operator error log whitelist entry: JmDeploymentStatus.ERROR.Count > --- > > Key: FLINK-30145 > URL: https://issues.apache.org/jira/browse/FLINK-30145 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Gabor Somogyi >Assignee: Gabor Somogyi >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors
reta commented on code in PR #1: URL: https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029467392 ## flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/util/IgnoringFailureHandler.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.opensearch.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.opensearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.opensearch.RequestIndexer; + +import org.opensearch.action.ActionRequest; + +/** Ignores all kinds of failures and drops the affected {@link ActionRequest}. */ +@Internal +public class IgnoringFailureHandler implements ActionRequestFailureHandler { Review Comment: Sure, thanks, I will drop them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-30116) Don't Show Env Vars in Web UI
[ https://issues.apache.org/jira/browse/FLINK-30116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-30116: Assignee: ConradJam > Don't Show Env Vars in Web UI > - > > Key: FLINK-30116 > URL: https://issues.apache.org/jira/browse/FLINK-30116 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.16.0 >Reporter: Konstantin Knauf >Assignee: ConradJam >Priority: Critical > Fix For: 1.16.1 > > > As discussed and agreed upon in [1], we'd like to revert [2] and not show any > environment variables in the Web UI for security reasons. > [1] https://lists.apache.org/thread/rjgk15bqttvblp60zry4n5pw4xjw7q9k > [2] https://issues.apache.org/jira/browse/FLINK-28311 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30116) Don't Show Env Vars in Web UI
[ https://issues.apache.org/jira/browse/FLINK-30116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-30116: - Fix Version/s: 1.17.0 > Don't Show Env Vars in Web UI > - > > Key: FLINK-30116 > URL: https://issues.apache.org/jira/browse/FLINK-30116 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.16.0 >Reporter: Konstantin Knauf >Assignee: ConradJam >Priority: Critical > Fix For: 1.17.0, 1.16.1 > > > As discussed and agreed upon in [1], we'd like to revert [2] and not show any > environment variables in the Web UI for security reasons. > [1] https://lists.apache.org/thread/rjgk15bqttvblp60zry4n5pw4xjw7q9k > [2] https://issues.apache.org/jira/browse/FLINK-28311 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30116) Don't Show Env Vars in Web UI
[ https://issues.apache.org/jira/browse/FLINK-30116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-30116: - Priority: Blocker (was: Critical) > Don't Show Env Vars in Web UI > - > > Key: FLINK-30116 > URL: https://issues.apache.org/jira/browse/FLINK-30116 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.16.0 >Reporter: Konstantin Knauf >Assignee: ConradJam >Priority: Blocker > Fix For: 1.17.0, 1.16.1 > > > As discussed and agreed upon in [1], we'd like to revert [2] and not show any > environment variables in the Web UI for security reasons. > [1] https://lists.apache.org/thread/rjgk15bqttvblp60zry4n5pw4xjw7q9k > [2] https://issues.apache.org/jira/browse/FLINK-28311 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors
zentol commented on code in PR #1: URL: https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029448617 ## flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/util/IgnoringFailureHandler.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.opensearch.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.opensearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.opensearch.RequestIndexer; + +import org.opensearch.action.ActionRequest; + +/** Ignores all kinds of failures and drops the affected {@link ActionRequest}. */ +@Internal +public class IgnoringFailureHandler implements ActionRequestFailureHandler { Review Comment: Yes and no. A user _could_ set the failure handler, but this implementation here is marked as `@Internal`; presumably because they were targeting the Table API connector. See `ElasticsearchConfiguration#getFailureHandler` _If_ we want to be be a drop-in replacement for the ES table api connector, then we'd need to pull in all of FLINK-26638, which also includes going back to the old ES sink for the Table API. It's a bit of an unsolved question on the ES connector side as to what will happen with these failure handlers :/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-30062) Move existing HBase connector code from Flink repo to dedicated HBase repo
[ https://issues.apache.org/jira/browse/FLINK-30062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17637303#comment-17637303 ] Martijn Visser commented on FLINK-30062: [~ferenc-csaky] The repo is created and I've assigned this issue to you > Move existing HBase connector code from Flink repo to dedicated HBase repo > -- > > Key: FLINK-30062 > URL: https://issues.apache.org/jira/browse/FLINK-30062 > Project: Flink > Issue Type: Sub-task > Components: Connectors / HBase >Reporter: Martijn Visser >Assignee: Ferenc Csaky >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #1: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors
zentol commented on code in PR #1: URL: https://github.com/apache/flink-connector-opensearch/pull/1#discussion_r1029441900 ## flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchDynamicSinkFactory.java: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.StringUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.ALLOW_INSECURE; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.CONNECTION_TIMEOUT; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.DELIVERY_GUARANTEE_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.FORMAT_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.HOSTS_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.INDEX_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.KEY_DELIMITER_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.SOCKET_TIMEOUT; +import static org.apache.flink.connector.opensearch.table.OpensearchConnectorOptions.USERNAME_OPTION; +import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; +import static org.opensearch.common.Strings.capitalize; + +/** A {@link DynamicTableSinkFactory} for discovering OpensearchDynamicSink. */ +@Internal +public class OpensearchDynamicSinkFactory implements DynamicTableSinkFactory { +private static final String FACTORY_IDENTIFIER = "opensearch"; + +private final OpensearchSinkBuilderSupplier sinkBuilderSupplier; + +public OpensearchDynamicSinkFactory() { +this.sinkBuilderSupplier =
[jira] [Closed] (FLINK-30061) Create and initialize repository for HBase connector
[ https://issues.apache.org/jira/browse/FLINK-30061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-30061. -- Fix Version/s: hbase-3.0.0 Resolution: Fixed Repository created at https://github.com/apache/flink-connector-hbase Init commit: 4faad76941b29e1ac692a5dffc04ccae44657a0c > Create and initialize repository for HBase connector > > > Key: FLINK-30061 > URL: https://issues.apache.org/jira/browse/FLINK-30061 > Project: Flink > Issue Type: Sub-task > Components: Connectors / HBase >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Fix For: hbase-3.0.0 > > > * Create repository > * Init repo and setup CI -- This message was sent by Atlassian Jira (v8.20.10#820010)