[GitHub] [flink-table-store] FangYongs closed pull request #313: [FLINK-27958] Compare batch maxKey to reduce comparisons in SortMergeReader

2023-02-10 Thread via GitHub


FangYongs closed pull request #313: [FLINK-27958] Compare batch maxKey to 
reduce comparisons in SortMergeReader
URL: https://github.com/apache/flink-table-store/pull/313


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30979) The buckets of the secondary partition should fall on different tasks

2023-02-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30979:
---
Labels: pull-request-available  (was: )

> The buckets of the secondary partition should fall on different tasks
> -
>
> Key: FLINK-30979
> URL: https://issues.apache.org/jira/browse/FLINK-30979
> Project: Flink
>  Issue Type: Improvement
>  Components: Table Store
>Reporter: Jingsong Lee
>Assignee: Shammon
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.4.0
>
>
> In Flink Streaming Job, sink to table store.
> Considering that I only set one bucket now, but there are many secondary 
> partitions, I expect to use multiple parallelism tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] FangYongs opened a new pull request, #522: [FLINK-30979] Support shuffling data by partition

2023-02-10 Thread via GitHub


FangYongs opened a new pull request, #522:
URL: https://github.com/apache/flink-table-store/pull/522

   Currently sink operator in flink will shuffle data by bucket id, which cause 
data skew when there is only 1 bucket with multiple partitions in the table. 
This PR aims to support shuffling data by bucket id and partition when 
`sink.shuffle-by-partition.enable` is set.
   
   The main changes are
   1. Added config `sink.shuffle-by-partition.enable` to support shuffling data 
by partition
   2. Added `PartitionComputer` to get partition from row data
   3. Added shuffling data by partition in `BucketStreamPartitioner`
   
   The main tests are
   1. Added `FileStoreShuffleBucketTest` to shuffle data by bucket and partition
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31021) JavaCodeSplitter doesn't split static method properly

2023-02-10 Thread Krzysztof Chmielewski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687368#comment-17687368
 ] 

Krzysztof Chmielewski commented on FLINK-31021:
---

Hi, i have few questions.
1. Could you provide full body of original decode method? 
 2. do you have sql query that reproduces the problem?

3. You marked affect version as 1.16.1 and below. Did you in fact had this on 
those or on a current master? Im asking because recently there was a change in 
code splitter merged to master 1.17 and 1.16 release that is not included in 
1.16.1 so I'm wondering if this is a regression or something new.

Let me know,
Cheers.

> JavaCodeSplitter doesn't split static method properly
> -
>
> Key: FLINK-31021
> URL: https://issues.apache.org/jira/browse/FLINK-31021
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.14.4, 1.15.3, 1.16.1
>Reporter: Xingcan Cui
>Priority: Minor
>
> The exception while compiling the generated source
> {code:java}
> cause=org.codehaus.commons.compiler.CompileException: Line 3383, Column 90: 
> Instance method "default void 
> org.apache.flink.formats.protobuf.deserialize.GeneratedProtoToRow_655d75db1cf943838f5500013edfba82.decodeImpl(foo.bar.LogData)"
>  cannot be invoked in static context,{code}
> The original method header 
> {code:java}
> public static RowData decode(foo.bar.LogData message){{code}
> The code after split
>  
> {code:java}
> Line 3383: public static RowData decode(foo.bar.LogData message){ 
> decodeImpl(message); return decodeReturnValue$0; } 
> Line 3384:
> Line 3385: void decodeImpl(foo.bar.LogData message) {{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java:
##
@@ -231,7 +232,11 @@ public void 
handleSplitsChanges(SplitsChange splitsChanges
 }
 
 // Create pulsar consumer.
-this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+try {
+this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+} catch (PulsarClientException e) {

Review Comment:
   This should be the difference between Kafka client and Pulsar client. Kafka 
use polling API, and the client is created before handling the split. Pulsar 
share the consumers in a same client instance, every consumer will support only 
one split. So we have to create the consumer here. And the exception have to be 
wrapped into a runtime exception.
   
   I think we should expose exceptions in SplitReader.handleSplitsChanges` on 
the Flink side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103523033


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##
@@ -220,11 +223,14 @@ private void createSubscription(List 
newPartitions) {
 CursorPosition position =
 startCursor.position(partition.getTopic(), 
partition.getPartitionId());
 
-if (sourceConfiguration.isResetSubscriptionCursor()) {
-sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, 
subscriptionName));
-} else {
-sneakyAdmin(
-() -> position.createInitialPosition(pulsarAdmin, 
topic, subscriptionName));
+try {
+if (sourceConfiguration.isResetSubscriptionCursor()) {
+position.seekPosition(pulsarAdmin, topic, 
subscriptionName);
+} else {
+position.createInitialPosition(pulsarAdmin, topic, 
subscriptionName);
+}
+} catch (PulsarAdminException e) {
+throw new FlinkRuntimeException(e);

Review Comment:
   My bad, we can still throw this exception with a little more works on code 
refactoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103523033


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##
@@ -220,11 +223,14 @@ private void createSubscription(List 
newPartitions) {
 CursorPosition position =
 startCursor.position(partition.getTopic(), 
partition.getPartitionId());
 
-if (sourceConfiguration.isResetSubscriptionCursor()) {
-sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, 
subscriptionName));
-} else {
-sneakyAdmin(
-() -> position.createInitialPosition(pulsarAdmin, 
topic, subscriptionName));
+try {
+if (sourceConfiguration.isResetSubscriptionCursor()) {
+position.seekPosition(pulsarAdmin, topic, 
subscriptionName);
+} else {
+position.createInitialPosition(pulsarAdmin, topic, 
subscriptionName);
+}
+} catch (PulsarAdminException e) {
+throw new FlinkRuntimeException(e);

Review Comment:
   My bad, we can still throw this exception will a little more works on code 
refactoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103522744


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##
@@ -220,11 +223,14 @@ private void createSubscription(List 
newPartitions) {
 CursorPosition position =
 startCursor.position(partition.getTopic(), 
partition.getPartitionId());
 
-if (sourceConfiguration.isResetSubscriptionCursor()) {
-sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, 
subscriptionName));
-} else {
-sneakyAdmin(
-() -> position.createInitialPosition(pulsarAdmin, 
topic, subscriptionName));
+try {
+if (sourceConfiguration.isResetSubscriptionCursor()) {
+position.seekPosition(pulsarAdmin, topic, 
subscriptionName);
+} else {
+position.createInitialPosition(pulsarAdmin, topic, 
subscriptionName);
+}
+} catch (PulsarAdminException e) {
+throw new FlinkRuntimeException(e);

Review Comment:
   This method is call in 
`context.callAsync(this::getSubscribedTopicPartitions, 
this::checkPartitionChanges);`. It's hard to throw the exception here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103522426


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java:
##
@@ -619,7 +619,7 @@ private void ensureSubscriberIsNull(String 
attemptingSubscribeMode) {
 
 private void ensureSchemaTypeIsValid(Schema schema) {
 SchemaInfo info = schema.getSchemaInfo();
-if (info.getType() == SchemaType.AUTO_CONSUME || info.getType() == 
SchemaType.AUTO) {
+if (info.getType() == SchemaType.AUTO_CONSUME) {

Review Comment:
   I just notice this is a deprecated API calling in Pulsar and could never 
happen. Since this is a code refactor PR, I add it here and didn't submit a new 
PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java:
##
@@ -231,7 +232,11 @@ public void 
handleSplitsChanges(SplitsChange splitsChanges
 }
 
 // Create pulsar consumer.
-this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+try {
+this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+} catch (PulsarClientException e) {

Review Comment:
   This should be the difference between Kafka client and Pulsar client. Kafka 
use polling API, and the client is created before handling the split. Pulsar 
share the consumer in same client instance, every consumer will support only 
one split. So we have to create the consumer here. And the exception have to be 
wrapped into a runtime exception.
   
   I think we should expose exceptions in SplitReader.handleSplitsChanges` on 
the Flink side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java:
##
@@ -231,7 +232,11 @@ public void 
handleSplitsChanges(SplitsChange splitsChanges
 }
 
 // Create pulsar consumer.
-this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+try {
+this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+} catch (PulsarClientException e) {

Review Comment:
   This should be the difference between Kafka client and Pulsar client. Kafka 
use polling API, and the client is created before handling the split. Pulsar 
share the consumer in same client instance, every consumer will support only 
one split. So we have to create the consumer here. And the exception have to 
wrap into a runtime exception.
   
   I think we should expose exceptions in SplitReader.handleSplitsChanges` on 
the Flink side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30993) Introduce FloatSerializer for Table Store

2023-02-10 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-30993.

  Assignee: Feng Wang
Resolution: Fixed

master: c62992cef568274ec777c2d6e264a12f077b0925

> Introduce FloatSerializer for Table Store
> -
>
> Key: FLINK-30993
> URL: https://issues.apache.org/jira/browse/FLINK-30993
> Project: Flink
>  Issue Type: New Feature
>  Components: Table Store
>Reporter: Feng Wang
>Assignee: Feng Wang
>Priority: Major
> Fix For: table-store-0.4.0
>
>
> Introduce FloatSerializer for Table Store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30992) Introduce ShortSerializer for Table Store

2023-02-10 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-30992.

  Assignee: Feng Wang
Resolution: Fixed

master: f285db56b8a440ef4ef08d15b5a48d58b7c10e01

> Introduce ShortSerializer for Table Store
> -
>
> Key: FLINK-30992
> URL: https://issues.apache.org/jira/browse/FLINK-30992
> Project: Flink
>  Issue Type: New Feature
>  Components: Table Store
>Reporter: Feng Wang
>Assignee: Feng Wang
>Priority: Major
> Fix For: table-store-0.4.0
>
>
> Introduce ShortSerializer for Table Store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #511: [Flink-30993] Introduce FloatSerializer for Table Store

2023-02-10 Thread via GitHub


JingsongLi merged PR #511:
URL: https://github.com/apache/flink-table-store/pull/511


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-30991) Introduce LongSerializer for Table Store

2023-02-10 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-30991.

Resolution: Fixed

master: 430df6c595c229a7128a43c5a1dc831d3f91f905

> Introduce LongSerializer for Table Store
> 
>
> Key: FLINK-30991
> URL: https://issues.apache.org/jira/browse/FLINK-30991
> Project: Flink
>  Issue Type: New Feature
>  Components: Table Store
>Reporter: Feng Wang
>Assignee: Feng Wang
>Priority: Major
> Fix For: table-store-0.4.0
>
>
> Introduce LongSerializer for Table Store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #510: [Flink-30992] Introduce ShortSerializer for Table Store

2023-02-10 Thread via GitHub


JingsongLi merged PR #510:
URL: https://github.com/apache/flink-table-store/pull/510


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi merged pull request #509: [Flink-30991] Introduce LongSerializer for Table Store

2023-02-10 Thread via GitHub


JingsongLi merged PR #509:
URL: https://github.com/apache/flink-table-store/pull/509


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31021) JavaCodeSplitter doesn't split static method properly

2023-02-10 Thread Xingcan Cui (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-31021:

Affects Version/s: 1.15.3

> JavaCodeSplitter doesn't split static method properly
> -
>
> Key: FLINK-31021
> URL: https://issues.apache.org/jira/browse/FLINK-31021
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.14.4, 1.15.3, 1.16.1
>Reporter: Xingcan Cui
>Priority: Minor
>
> The exception while compiling the generated source
> {code:java}
> cause=org.codehaus.commons.compiler.CompileException: Line 3383, Column 90: 
> Instance method "default void 
> org.apache.flink.formats.protobuf.deserialize.GeneratedProtoToRow_655d75db1cf943838f5500013edfba82.decodeImpl(foo.bar.LogData)"
>  cannot be invoked in static context,{code}
> The original method header 
> {code:java}
> public static RowData decode(foo.bar.LogData message){{code}
> The code after split
>  
> {code:java}
> Line 3383: public static RowData decode(foo.bar.LogData message){ 
> decodeImpl(message); return decodeReturnValue$0; } 
> Line 3384:
> Line 3385: void decodeImpl(foo.bar.LogData message) {{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31021) JavaCodeSplitter doesn't split static method properly

2023-02-10 Thread Xingcan Cui (Jira)
Xingcan Cui created FLINK-31021:
---

 Summary: JavaCodeSplitter doesn't split static method properly
 Key: FLINK-31021
 URL: https://issues.apache.org/jira/browse/FLINK-31021
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.16.1, 1.14.4
Reporter: Xingcan Cui


The exception while compiling the generated source
{code:java}
cause=org.codehaus.commons.compiler.CompileException: Line 3383, Column 90: 
Instance method "default void 
org.apache.flink.formats.protobuf.deserialize.GeneratedProtoToRow_655d75db1cf943838f5500013edfba82.decodeImpl(foo.bar.LogData)"
 cannot be invoked in static context,{code}
The original method header 
{code:java}
public static RowData decode(foo.bar.LogData message){{code}
The code after split
 
{code:java}
Line 3383: public static RowData decode(foo.bar.LogData message){ 
decodeImpl(message); return decodeReturnValue$0; } 
Line 3384:
Line 3385: void decodeImpl(foo.bar.LogData message) {{code}
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] shuiqiangchen commented on pull request #21897: [FLINK-30922][table-planner] Apply persisted columns when doing appendPartitionAndNu…

2023-02-10 Thread via GitHub


shuiqiangchen commented on PR #21897:
URL: https://github.com/apache/flink/pull/21897#issuecomment-1426584980

   @MartijnVisser Thank you for reminding. It worked after rebasing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect

2023-02-10 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687347#comment-17687347
 ] 

Shuiqiang Chen edited comment on FLINK-31003 at 2/11/23 2:51 AM:
-

Hi [~martijnvisser][~weiqinpan], I think it is the same issue as [FLINK-30966 
|https://issues.apache.org/jira/browse/FLINK-30966], that
when normalizing arguments in IfCallGen, it always align to the type of ARG1, 
like IF(1 > 2, 'true', 'false')
the result will be string 'fals' which length is the same as 'true'.


was (Author: csq):
Hi [~martijnvisser][~weiqinpan], I think it is the same issue as [FLINK-30966 
title|https://issues.apache.org/jira/browse/FLINK-30966], that
when normalizing arguments in IfCallGen, it always align to the type of ARG1, 
like IF(1 > 2, 'true', 'false')
the result will be string 'fals' which length is the same as 'true'.

> Flink SQL IF / CASE WHEN Funcation incorrect
> 
>
> Key: FLINK-31003
> URL: https://issues.apache.org/jira/browse/FLINK-31003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1
>Reporter: weiqinpan
>Priority: Major
>
> When I execute the below sql using sql-client,i found something wrong.
>  
> {code:java}
> CREATE TEMPORARY TABLE source (
>   mktgmsg_biz_type STRING,
>   marketing_flow_id STRING,
>   mktgmsg_campaign_id STRING
> )
> WITH
> (
>   'connector' = 'filesystem',
>   'path' = 'file:///Users/xxx/Desktop/demo.json',
>   'format' = 'json'
> ); 
> -- return correct value('marketing_flow_id') 
> SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM 
> source;
> -- return incorrect value('')
> SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM 
> source;{code}
> The demo.json data is 
>  
> {code:java}
> {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": 
> "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code}
>  
>  
> BTW, use case when + if / ifnull also have something wrong.
>  
> {code:java}
> -- return wrong value(''), expect return marketing_flow_id
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN IF(`marketing_flow_id` 
> IS NULL, `marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`)
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return wrong value('')
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return correct value, the difference is [else return ' ']
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ' '
>   END AS `message_campaign_instance_id` FROM source;
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect

2023-02-10 Thread Shuiqiang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687347#comment-17687347
 ] 

Shuiqiang Chen commented on FLINK-31003:


Hi [~martijnvisser][~weiqinpan], I think it is the same issue as [FLINK-30966 
title|https://issues.apache.org/jira/browse/FLINK-30966], that
when normalizing arguments in IfCallGen, it always align to the type of ARG1, 
like IF(1 > 2, 'true', 'false')
the result will be string 'fals' which length is the same as 'true'.

> Flink SQL IF / CASE WHEN Funcation incorrect
> 
>
> Key: FLINK-31003
> URL: https://issues.apache.org/jira/browse/FLINK-31003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1
>Reporter: weiqinpan
>Priority: Major
>
> When I execute the below sql using sql-client,i found something wrong.
>  
> {code:java}
> CREATE TEMPORARY TABLE source (
>   mktgmsg_biz_type STRING,
>   marketing_flow_id STRING,
>   mktgmsg_campaign_id STRING
> )
> WITH
> (
>   'connector' = 'filesystem',
>   'path' = 'file:///Users/xxx/Desktop/demo.json',
>   'format' = 'json'
> ); 
> -- return correct value('marketing_flow_id') 
> SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM 
> source;
> -- return incorrect value('')
> SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM 
> source;{code}
> The demo.json data is 
>  
> {code:java}
> {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": 
> "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code}
>  
>  
> BTW, use case when + if / ifnull also have something wrong.
>  
> {code:java}
> -- return wrong value(''), expect return marketing_flow_id
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN IF(`marketing_flow_id` 
> IS NULL, `marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`)
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return wrong value('')
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return correct value, the difference is [else return ' ']
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ' '
>   END AS `message_campaign_instance_id` FROM source;
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-02-10 Thread Andriy Redko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687346#comment-17687346
 ] 

Andriy Redko commented on FLINK-30998:
--

[~lilyevsky] thanks, you could send the pull request to 
[https://github.com/apache/flink-connector-opensearch/] and we could work it 
through. Any other options are more comfortable for you? Thanks!

> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Priority: Major
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-02-10 Thread Leonid Ilyevsky (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687343#comment-17687343
 ] 

Leonid Ilyevsky commented on FLINK-30998:
-

[~reta] Thanks, please let me know how to submit my change. I can do it on 
Monday.

> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Priority: Major
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103481149


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java:
##
@@ -46,17 +45,18 @@ private PulsarTransactionUtils() {
 /** Create transaction with given timeout millis. */
 public static Transaction createTransaction(PulsarClient pulsarClient, 
long timeoutMs) {
 try {
-CompletableFuture future =
-sneakyClient(pulsarClient::newTransaction)
-.withTransactionTimeout(timeoutMs, 
TimeUnit.MILLISECONDS)
-.build();
-
-return future.get();
+return pulsarClient
+.newTransaction()
+.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+.build()
+.get();
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new IllegalStateException(e);
 } catch (ExecutionException e) {
 throw new FlinkRuntimeException(unwrap(e));
+} catch (PulsarClientException e) {

Review Comment:
   You are right. I changed it to Pulsar's built-in handle method.
   
   ```java
   public static Transaction createTransaction(PulsarClient pulsarClient, long 
timeoutMs)
   throws PulsarClientException {
   try {
   return pulsarClient
   .newTransaction()
   .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS)
   .build()
   .get();
   } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
   throw new PulsarClientException(e);
   } catch (Exception e) {
   throw PulsarClientException.unwrap(e);
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103481149


##
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java:
##
@@ -46,17 +45,18 @@ private PulsarTransactionUtils() {
 /** Create transaction with given timeout millis. */
 public static Transaction createTransaction(PulsarClient pulsarClient, 
long timeoutMs) {
 try {
-CompletableFuture future =
-sneakyClient(pulsarClient::newTransaction)
-.withTransactionTimeout(timeoutMs, 
TimeUnit.MILLISECONDS)
-.build();
-
-return future.get();
+return pulsarClient
+.newTransaction()
+.withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+.build()
+.get();
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new IllegalStateException(e);
 } catch (ExecutionException e) {
 throw new FlinkRuntimeException(unwrap(e));
+} catch (PulsarClientException e) {

Review Comment:
   You are right. I changed it to Pulsar's built-in handle method.
   
   ```
   public static Transaction createTransaction(PulsarClient pulsarClient, 
long timeoutMs)
   throws PulsarClientException {
   try {
   return pulsarClient
   .newTransaction()
   .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS)
   .build()
   .get();
   } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
   throw new PulsarClientException(e);
   } catch (Exception e) {
   throw PulsarClientException.unwrap(e);
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31020) Read-only mode for Rest API

2023-02-10 Thread Omkar Deshpande (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omkar Deshpande updated FLINK-31020:

Summary: Read-only mode for Rest API  (was: Provide read-only mode for 
flink web UI)

> Read-only mode for Rest API
> ---
>
> Key: FLINK-31020
> URL: https://issues.apache.org/jira/browse/FLINK-31020
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / REST
>Affects Versions: 1.16.1
>Reporter: Omkar Deshpande
>Priority: Major
>
> We run Flink jobs on application cluster on Kubernetes. We don't 
> submit/cancel or modify jobs from rest API or web UI. If there was an option 
> to enable only GET operations on the rest service, it would greatly solve the 
> problem of configuring access control and reduce the attack surface.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31020) Provide read-only mode for flink web UI

2023-02-10 Thread Omkar Deshpande (Jira)
Omkar Deshpande created FLINK-31020:
---

 Summary: Provide read-only mode for flink web UI
 Key: FLINK-31020
 URL: https://issues.apache.org/jira/browse/FLINK-31020
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / REST
Affects Versions: 1.16.1
Reporter: Omkar Deshpande


We run Flink jobs on application cluster on Kubernetes. We don't submit/cancel 
or modify jobs from rest API or web UI. If there was an option to enable only 
GET operations on the rest service, it would greatly solve the problem of 
configuring access control and reduce the attack surface.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] leeoo commented on pull request #13081: [FLINK-18590][json] Support json array explode to multi messages

2023-02-10 Thread via GitHub


leeoo commented on PR #13081:
URL: https://github.com/apache/flink/pull/13081#issuecomment-1426566305

   @libenchao Okay, but I don't have experience in improving Flink framework, 
it's better to review it by Flink framework contributors. I will keep an eye on 
it and test it with my kafka JSON array message data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-28283) Improving the log of flink when job start and deploy

2023-02-10 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-28283.

Resolution: Won't Do

[~zlzhang0122], thanks for your reply. Closing this as won't do.

> Improving the log of flink when job start and deploy
> 
>
> Key: FLINK-28283
> URL: https://issues.apache.org/jira/browse/FLINK-28283
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.14.2
>Reporter: zlzhang0122
>Priority: Major
>
> When running a large job with many operators and subtasks on flink, the 
> JobManager and TaskManager will have a huge logs about the subtask executing 
> msg such as "XXX switched from CREATED to SCHEDULED、XXX switched from 
> SCHEDULED to DEPLOYING 、XXX switched from DEPLOYING to RUNNING 、XXX switched 
> from RUNNING to CANCELING、XXX switched from CANCELING to CANCELED", etc. .
> Maybe we can do some improvement about this, such as aggregate these msg to 
> reduce the log, or change the log level and only logs the failure msg and 
> subtask, etc. Not so sure about the solution, but these msg is really too 
> much. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103451369


##
flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicsConsumingContext.java:
##
@@ -56,7 +57,11 @@ protected String subscriptionName() {
 @Override
 protected String generatePartitionName() {
 String topic = topicPrefix + index;
-operator.createTopic(topic, 1);
+try {
+operator.createTopic(topic, 1);
+} catch (Exception e) {

Review Comment:
   I'll keep it unfixed in this PR until it get fixed in 
[FLINK-31014](https://issues.apache.org/jira/browse/FLINK-31014).



##
flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java:
##
@@ -55,7 +56,11 @@ public Sink createSink(TestingSinkSettings 
sinkSettings) {
 // Create the topic if it needs.
 if (creatTopic()) {
 for (String topic : topics) {
-operator.createTopic(topic, 4);
+try {
+operator.createTopic(topic, 4);
+} catch (Exception e) {

Review Comment:
   I'll keep it unfixed in this PR until it get fixed in 
[FLINK-31014](https://issues.apache.org/jira/browse/FLINK-31014).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-pulsar] syhily commented on a diff in pull request #24: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception.

2023-02-10 Thread via GitHub


syhily commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103449916


##
flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##
@@ -80,7 +72,7 @@
 import static 
org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
 
 /** A pulsar cluster operator used for operating pulsar instance. */
-public class PulsarRuntimeOperator implements Closeable {
+public class PulsarRuntimeOperator {

Review Comment:
   Yep, this is a mistake from my side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] libenchao commented on pull request #13081: [FLINK-18590][json] Support json array explode to multi messages

2023-02-10 Thread via GitHub


libenchao commented on PR #13081:
URL: https://github.com/apache/flink/pull/13081#issuecomment-1426536898

   @leeoo Thanks for your interest, I'll rebase the PR the latest master branch 
lately, would you please help to review it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-30757) Ugrade busybox version to a pinned version for operator

2023-02-10 Thread Shipeng Xie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687321#comment-17687321
 ] 

Shipeng Xie edited comment on FLINK-30757 at 2/11/23 12:16 AM:
---

The e2e 
[error|https://github.com/apache/flink-kubernetes-operator/actions/runs/4147827390/jobs/7175460340#step:9:1340]
 in busybox init container is {_}wget: error getting response: Connection reset 
by peer{_}. It looks like similar error is already reported as an 
[issue|https://github.com/docker-library/busybox/issues/162]. 
I pushed another commit to downgrade from 1.36.0 to 1.35.0 but the workflow run 
still needs approval.


was (Author: JIRAUSER296422):
The e2e 
[error|https://github.com/apache/flink-kubernetes-operator/actions/runs/4147827390/jobs/7175460340#step:9:1340]
 in busybox init container is `wget: error getting response: Connection reset 
by peer`. It looks like similar error is already reported as an 
[issue|https://github.com/docker-library/busybox/issues/162]. 
I pushed another commit to downgrade from 1.36.0 to 1.35.0 but the workflow run 
still needs approval.

> Ugrade busybox version to a pinned version for operator
> ---
>
> Key: FLINK-30757
> URL: https://issues.apache.org/jira/browse/FLINK-30757
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gabor Somogyi
>Assignee: Shipeng Xie
>Priority: Minor
>  Labels: pull-request-available, starter
>
> It has been seen that the operator e2e tests were flaky when used the latest 
> version of the busybox so we've pinned it to a relatively old version. 
> https://github.com/apache/flink-kubernetes-operator/commit/8e41ed5ec1adda9ae06bc0b6203abf42939fbf2b
> It would be good to do 2 things
> * Upgrade the busybox version to the latest in a pinned way
> * Add debug logs of the busybox pod in case of failure (to see why failed)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30757) Ugrade busybox version to a pinned version for operator

2023-02-10 Thread Shipeng Xie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687321#comment-17687321
 ] 

Shipeng Xie commented on FLINK-30757:
-

The e2e 
[error|https://github.com/apache/flink-kubernetes-operator/actions/runs/4147827390/jobs/7175460340#step:9:1340]
 in busybox init container is `wget: error getting response: Connection reset 
by peer`. It looks like similar error is already reported as an 
[issue|https://github.com/docker-library/busybox/issues/162]. 
I pushed another commit to downgrade from 1.36.0 to 1.35.0 but the workflow run 
still needs approval.

> Ugrade busybox version to a pinned version for operator
> ---
>
> Key: FLINK-30757
> URL: https://issues.apache.org/jira/browse/FLINK-30757
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gabor Somogyi
>Assignee: Shipeng Xie
>Priority: Minor
>  Labels: pull-request-available, starter
>
> It has been seen that the operator e2e tests were flaky when used the latest 
> version of the busybox so we've pinned it to a relatively old version. 
> https://github.com/apache/flink-kubernetes-operator/commit/8e41ed5ec1adda9ae06bc0b6203abf42939fbf2b
> It would be good to do 2 things
> * Upgrade the busybox version to the latest in a pinned way
> * Add debug logs of the busybox pod in case of failure (to see why failed)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] xshipeng commented on pull request #530: [FLINK-30757] Upgrade busybox version to a pinned version for operator

2023-02-10 Thread via GitHub


xshipeng commented on PR #530:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/530#issuecomment-1426506352

   It looks like busybox latest (1.36.0) is unstable -> 
https://github.com/docker-library/busybox/issues/162 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-02-10 Thread Andriy Redko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687289#comment-17687289
 ] 

Andriy Redko commented on FLINK-30998:
--

[~martijnvisser] [~lilyevsky] sure, happy to help here

> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Priority: Major
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30627) Refactor FileSystemTableSink to use FileSink instead of StreamingFileSink

2023-02-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687287#comment-17687287
 ] 

Martijn Visser commented on FLINK-30627:


[~tanyuxin] Could you elaborate on how you would like to solve this problem? So 
that some of the other maintainers can see if that's a good approach to solving 
this. 

> Refactor FileSystemTableSink to use FileSink instead of StreamingFileSink
> -
>
> Key: FLINK-30627
> URL: https://issues.apache.org/jira/browse/FLINK-30627
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / FileSystem
>Reporter: Martijn Visser
>Priority: Major
>
> {{FileSystemTableSink}} currently depends on most of the capabilities from 
> {{StreamingFileSink}}, for example 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L223-L243
> This is necessary to complete FLINK-28641



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30757) Ugrade busybox version to a pinned version for operator

2023-02-10 Thread Shipeng Xie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687284#comment-17687284
 ] 

Shipeng Xie commented on FLINK-30757:
-

Hi [~gyfora] [~gaborgsomogyi], I added the code to print init container log, 
tested locally and opened a draft MR to trigger Github CI. However, it is said 
that {{First-time contributors need a maintainer to approve running 
workflows}}. Could you please help approve? Thanks!

> Ugrade busybox version to a pinned version for operator
> ---
>
> Key: FLINK-30757
> URL: https://issues.apache.org/jira/browse/FLINK-30757
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gabor Somogyi
>Assignee: Shipeng Xie
>Priority: Minor
>  Labels: pull-request-available, starter
>
> It has been seen that the operator e2e tests were flaky when used the latest 
> version of the busybox so we've pinned it to a relatively old version. 
> https://github.com/apache/flink-kubernetes-operator/commit/8e41ed5ec1adda9ae06bc0b6203abf42939fbf2b
> It would be good to do 2 things
> * Upgrade the busybox version to the latest in a pinned way
> * Add debug logs of the busybox pod in case of failure (to see why failed)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30757) Ugrade busybox version to a pinned version for operator

2023-02-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30757:
---
Labels: pull-request-available starter  (was: starter)

> Ugrade busybox version to a pinned version for operator
> ---
>
> Key: FLINK-30757
> URL: https://issues.apache.org/jira/browse/FLINK-30757
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gabor Somogyi
>Assignee: Shipeng Xie
>Priority: Minor
>  Labels: pull-request-available, starter
>
> It has been seen that the operator e2e tests were flaky when used the latest 
> version of the busybox so we've pinned it to a relatively old version. 
> https://github.com/apache/flink-kubernetes-operator/commit/8e41ed5ec1adda9ae06bc0b6203abf42939fbf2b
> It would be good to do 2 things
> * Upgrade the busybox version to the latest in a pinned way
> * Add debug logs of the busybox pod in case of failure (to see why failed)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] xshipeng opened a new pull request, #530: [FLINK-30757] Upgrade busybox version to a pinned version for operator

2023-02-10 Thread via GitHub


xshipeng opened a new pull request, #530:
URL: https://github.com/apache/flink-kubernetes-operator/pull/530

   
   
   ## What is the purpose of the change
   
   Since [e2e 
test](https://github.com/apache/flink-kubernetes-operator/actions/runs/389690/jobs/6662337363)
 is flaky with latest busybox image in init container, we [pinned] 
(https://github.com/apache/flink-kubernetes-operator/commit/8e41ed5ec1adda9ae06bc0b6203abf42939fbf2b)
 the image version to a relatively old version.
   
   ## Brief change log
 - Add log printing for init container if e2e test is failed.
 - Upgrade the busybox version to the latest in a pinned way.
   
   ## Verifying this change
   
   
   This change is already covered by existing tests, such as 
`e2e-tests/test_application_kubernetes_ha.sh`.
   
   To reproduce the failed tests locally and test modified init container log 
printing code:
 - Change the init container command in `e2e-tests/data/flinkdep-cr.yaml` 
to `command: [ 'wget', 
'https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar',
 '-O', '/flink-artifact/myjob.jar' ]`.
 - Run `./e2e-tests/test_application_kubernetes_ha.sh`.
 - Part of the print log will be 
 ```
   ...
   Flink logs:
   Current logs for 
flink-example-statemachine-dddb4d664-m6n5b:artifacts-fetcher: 
   Connecting to repo1.maven.org (198.18.3.208:443)
   wget: note: TLS certificate validation not implemented
   saving to '/flink-artifact/myjob.jar'
   myjob.jar100% ||  267k  0:00:00 
ETA
   '/flink-artifact/myjob.jar' saved
   test error msg
   Previous logs for 
flink-example-statemachine-dddb4d664-m6n5b:artifacts-fetcher: 
   Connecting to repo1.maven.org (198.18.3.208:443)
   wget: note: TLS certificate validation not implemented
   saving to '/flink-artifact/myjob.jar'
   myjob.jar100% ||  267k  0:00:00 
ETA
   '/flink-artifact/myjob.jar' saved
   test error msg
   Current logs for 
flink-example-statemachine-dddb4d664-m6n5b:flink-main-container: 
   Current logs for 
flink-kubernetes-operator-9744c66bd-27q2r:flink-kubernetes-operator: 
   Starting Operator
   ...
   ```
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] MartijnVisser commented on pull request #21897: [FLINK-30922][table-planner] Apply persisted columns when doing appendPartitionAndNu…

2023-02-10 Thread via GitHub


MartijnVisser commented on PR #21897:
URL: https://github.com/apache/flink/pull/21897#issuecomment-1426325867

   @shuiqiangchen You need to rebase your PR on the latest changes in `master`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-02-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687276#comment-17687276
 ] 

Martijn Visser commented on FLINK-30998:


[~reta] WDYT?

> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Priority: Major
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-30971) Modify the default value of parameter 'table.exec.local-hash-agg.adaptive.sampling-threshold'

2023-02-10 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser reassigned FLINK-30971:
--

Assignee: Yunhong Zheng

> Modify the default value of parameter 
> 'table.exec.local-hash-agg.adaptive.sampling-threshold'
> -
>
> Key: FLINK-30971
> URL: https://issues.apache.org/jira/browse/FLINK-30971
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> In our test environment, we set the default parallelism to  1 and got the 
> most appropriate default value of parameter 
> 'table.exec.local-hash-agg.adaptive.sampling-threshold'  is 500. However, 
> for these batch jobs with high parallelism in produce environment,  the 
> amount of data in single parallelism is almost less than 500. Therefore, 
> after testing, we found that set to 50 can get better results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30971) Modify the default value of parameter 'table.exec.local-hash-agg.adaptive.sampling-threshold'

2023-02-10 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-30971:
---
Fix Version/s: 1.17.0

> Modify the default value of parameter 
> 'table.exec.local-hash-agg.adaptive.sampling-threshold'
> -
>
> Key: FLINK-30971
> URL: https://issues.apache.org/jira/browse/FLINK-30971
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> In our test environment, we set the default parallelism to  1 and got the 
> most appropriate default value of parameter 
> 'table.exec.local-hash-agg.adaptive.sampling-threshold'  is 500. However, 
> for these batch jobs with high parallelism in produce environment,  the 
> amount of data in single parallelism is almost less than 500. Therefore, 
> after testing, we found that set to 50 can get better results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30971) Modify the default value of parameter 'table.exec.local-hash-agg.adaptive.sampling-threshold'

2023-02-10 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-30971:
---
Fix Version/s: (was: 1.17.0)

> Modify the default value of parameter 
> 'table.exec.local-hash-agg.adaptive.sampling-threshold'
> -
>
> Key: FLINK-30971
> URL: https://issues.apache.org/jira/browse/FLINK-30971
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.0
>Reporter: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
>
> In our test environment, we set the default parallelism to  1 and got the 
> most appropriate default value of parameter 
> 'table.exec.local-hash-agg.adaptive.sampling-threshold'  is 500. However, 
> for these batch jobs with high parallelism in produce environment,  the 
> amount of data in single parallelism is almost less than 500. Therefore, 
> after testing, we found that set to 50 can get better results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31006) job is not finished when using pipeline mode to run bounded source like kafka/pulsar

2023-02-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687275#comment-17687275
 ] 

Martijn Visser commented on FLINK-31006:


[~renqs] WDYT?

> job is not finished when using pipeline mode to run bounded source like 
> kafka/pulsar
> 
>
> Key: FLINK-31006
> URL: https://issues.apache.org/jira/browse/FLINK-31006
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Connectors / Pulsar
>Affects Versions: 1.17.0
>Reporter: jackylau
>Assignee: jackylau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: image-2023-02-10-13-20-52-890.png, 
> image-2023-02-10-13-23-38-430.png, image-2023-02-10-13-24-46-929.png
>
>
> when i do failover works like kill jm/tm when using  pipeline mode to run 
> bounded source like kafka, i found job is not finished, when every partition 
> data has consumed.
>  
> After dig into code, i found this logical not run when JM recover. the 
> partition infos are not changed. so noMoreNewPartitionSplits is not set to 
> true. then this will not run 
>  
> !image-2023-02-10-13-23-38-430.png!
>  
> !image-2023-02-10-13-24-46-929.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31007) The code generated by the IF function throws NullPointerException

2023-02-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687274#comment-17687274
 ] 

Martijn Visser commented on FLINK-31007:


[~lincoln.86xy] WDYT?

> The code generated by the IF function throws NullPointerException
> -
>
> Key: FLINK-31007
> URL: https://issues.apache.org/jira/browse/FLINK-31007
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.15.2, 1.15.3
> Environment: {code:java}
> // code placeholder
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> final DataStream tab =
> env.fromCollection(Arrays.asList(
> new Tuple2<>(1L, "a_b_c"),
> new Tuple2<>(-1L, "a_b_c")));
> final Table tableA = tableEnv.fromDataStream(tab);
> tableEnv.executeSql("SELECT if(f0 = -1, '', split_index(f1, '_', 0)) as id 
> FROM " + tableA)
> .print(); {code}
>Reporter: tivanli
>Priority: Major
> Attachments: StreamExecCalc$19.java, image-2023-02-10-17-20-51-619.png
>
>
> Caused by: java.lang.NullPointerException
>     at StreamExecCalc$19.processElement_split1(Unknown Source)
>     at StreamExecCalc$19.processElement(Unknown Source)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>     at 
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>     at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>     at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
>     at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
>     at 
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:231)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] MartijnVisser commented on pull request #21630: [FLINK-30166][Connector/FileSystem] Refactor tests that use the deprecated StreamingFileSink instead of FileSink

2023-02-10 Thread via GitHub


MartijnVisser commented on PR #21630:
URL: https://github.com/apache/flink/pull/21630#issuecomment-1426319887

   @gaoyunhaii Can you take one more look? You've already reviewed the original 
PR, which missed one thing that is now addressed in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-30627) Refactor FileSystemTableSink to use FileSink instead of StreamingFileSink

2023-02-10 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-30627:
---
Description: 
{{FileSystemTableSink}} currently depends on most of the capabilities from 
{{StreamingFileSink}}, for example 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L223-L243

This is necessary to complete FLINK-28641

  was:In order to be able to remove the StreamingFileSink, the 
FileSystemTableSink needs to be refactored to no longer depend on 
StreamingFileSink but on FileSink


> Refactor FileSystemTableSink to use FileSink instead of StreamingFileSink
> -
>
> Key: FLINK-30627
> URL: https://issues.apache.org/jira/browse/FLINK-30627
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / FileSystem
>Reporter: Martijn Visser
>Priority: Major
>
> {{FileSystemTableSink}} currently depends on most of the capabilities from 
> {{StreamingFileSink}}, for example 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L223-L243
> This is necessary to complete FLINK-28641



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31019) Migrate FileSystemTableSink to FileSink

2023-02-10 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-31019.
--
Resolution: Duplicate

> Migrate FileSystemTableSink to FileSink
> ---
>
> Key: FLINK-31019
> URL: https://issues.apache.org/jira/browse/FLINK-31019
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Martijn Visser
>Priority: Major
>
> {{FileSystemTableSink}} currently depends on most of the capabilities from 
> {{StreamingFileSink}}, for example 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L223-L243
> This is necessary to complete FLINK-28641



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31019) Migrate FileSystemTableSink to FileSink

2023-02-10 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-31019:
--

 Summary: Migrate FileSystemTableSink to FileSink
 Key: FLINK-31019
 URL: https://issues.apache.org/jira/browse/FLINK-31019
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: Martijn Visser


{{FileSystemTableSink}} currently depends on most of the capabilities from 
{{StreamingFileSink}}, for example 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L223-L243

This is necessary to complete FLINK-28641



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink

2023-02-10 Thread via GitHub


reta commented on code in PR #5:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/5#discussion_r1103191251


##
.github/workflows/push_pr.yml:
##
@@ -25,6 +25,6 @@ jobs:
   compile_and_test:
 uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
 with:
-  flink_version: 1.16.0
-  flink_url: 
https://dist.apache.org/repos/dist/release/flink/flink-1.16.0/flink-1.16.0-bin-scala_2.12.tgz
+  flink_version: 1.16.1
+  flink_url: 
https://dist.apache.org/repos/dist/release/flink/flink-1.16.1/flink-1.16.1-bin-scala_2.12.tgz

Review Comment:
   (addressed in https://github.com/apache/flink-connector-opensearch/pull/7)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink

2023-02-10 Thread via GitHub


reta commented on PR #5:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/5#issuecomment-1426283511

   Thanks @dannycranmer , I think I went through all your comments, thanks a 
lot, really appreciate it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] dannycranmer commented on pull request #7: [hotfix] Update Apache Flink to 1.16.1

2023-02-10 Thread via GitHub


dannycranmer commented on PR #7:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/7#issuecomment-1426268450

   Thanks @reta 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] dannycranmer merged pull request #7: [hotfix] Update Apache Flink to 1.16.1

2023-02-10 Thread via GitHub


dannycranmer merged PR #7:
URL: https://github.com/apache/flink-connector-opensearch/pull/7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21911: [FLINK-19065] [runtime] Generate unique and consistent UID for internal Map operators used in CoGroupedStreams

2023-02-10 Thread via GitHub


flinkbot commented on PR #21911:
URL: https://github.com/apache/flink/pull/21911#issuecomment-1426267913

   
   ## CI report:
   
   * cee862b208cfd3674f6cbcd64d07e7f7c871253b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-19065) java.lang.IllegalStateException: Auto generated UIDs have been disabled on join

2023-02-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-19065:
---
Labels: auto-deprioritized-major auto-deprioritized-minor 
pull-request-available  (was: auto-deprioritized-major auto-deprioritized-minor)

> java.lang.IllegalStateException: Auto generated UIDs have been disabled on 
> join
> ---
>
> Key: FLINK-19065
> URL: https://issues.apache.org/jira/browse/FLINK-19065
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.11.0, 1.11.1
>Reporter: Maris
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> Join operation with AutoGeneratedUID disabled leads to 
> {code:java}
> java.lang.IllegalStateException: Auto generated UIDs have been disabled but 
> no UID or hash has been assigned to operator Map
> {code}
> code to reproduce
> {code:java}
> class JoinSpec extends AnyFlatSpec with Matchers with Serializable {
>   it should "be able to join streams" in {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.disableAutoGeneratedUIDs()
> val a = env.fromCollection(List("1", "2", 
> "3")).name("a").uid("source-uid")
> val b = env.fromCollection(List("1", "2", 
> "3")).name("b").uid("source-uid2")
> val c = a
>   .join(b)
>   .where(identity)
>   .equalTo(identity)
>   .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))((a, b) => 
> a+b)
>   .uid("joined").name("joined")
> c.addSink(s => println(s))
>   .name("ab")
>   .uid("ab")
> println(env.getExecutionPlan)
> env.execute
> succeed
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] hmit opened a new pull request, #21911: [FLINK-19065] [runtime] Generate unique and consistent UID for internal Map operators used in CoGroupedStreams

2023-02-10 Thread via GitHub


hmit opened a new pull request, #21911:
URL: https://github.com/apache/flink/pull/21911

   
   
   ## What is the purpose of the change
   
   This fixes issue when flink job using join/cogroup is run with 
autoGenerateUid disabled. With uid generation disabled, the input -> 
map-for-union throws a missing Uid exception from StreamGraphUtils.
   This PR fixes the issue by assigning a unique and consistent uid based on 
input operator's uid for consistency.
   
   
   ## Brief change log
   
 - Generate unique and consistent UID for internal Map operators used in 
CoGroupedStreams
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mxm commented on a diff in pull request #21908: [FLINK-30895][coordination] Dynamically adjust slot distribution

2023-02-10 Thread via GitHub


mxm commented on code in PR #21908:
URL: https://github.com/apache/flink/pull/21908#discussion_r1103096757


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##
@@ -133,7 +138,56 @@ public Optional 
determineParallelism(
 return Optional.of(new 
VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
 }
 
-private static Map determineParallelism(
+/**
+ * Distributes free slots across the slot-sharing groups of the job. Slots 
are distributed as
+ * evenly as possible while taking the minimum parallelism of contained 
vertices into account.
+ */
+private static Map 
determineSlotsPerSharingGroup(
+JobInformation jobInformation, int freeSlots) {
+int numUnassignedSlots = freeSlots;
+int numUnassignedSlotSharingGroups = 
jobInformation.getSlotSharingGroups().size();
+
+final Map slotSharingGroupParallelism = 
new HashMap<>();
+
+for (Tuple2 slotSharingGroup :
+sortSlotSharingGroupsByUpperParallelism(jobInformation)) {
+final int groupParallelism =
+Math.min(
+slotSharingGroup.f1,
+numUnassignedSlots / 
numUnassignedSlotSharingGroups);

Review Comment:
   That said, all of this is not set in stone. We can probably add a mode to 
the adaptive scheduler which disables downscaling in combination with 
pre-requesting the right amount of resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mxm commented on a diff in pull request #21908: [FLINK-30895][coordination] Dynamically adjust slot distribution

2023-02-10 Thread via GitHub


mxm commented on code in PR #21908:
URL: https://github.com/apache/flink/pull/21908#discussion_r1103092025


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##
@@ -133,7 +138,56 @@ public Optional 
determineParallelism(
 return Optional.of(new 
VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
 }
 
-private static Map determineParallelism(
+/**
+ * Distributes free slots across the slot-sharing groups of the job. Slots 
are distributed as
+ * evenly as possible while taking the minimum parallelism of contained 
vertices into account.
+ */
+private static Map 
determineSlotsPerSharingGroup(
+JobInformation jobInformation, int freeSlots) {
+int numUnassignedSlots = freeSlots;
+int numUnassignedSlotSharingGroups = 
jobInformation.getSlotSharingGroups().size();
+
+final Map slotSharingGroupParallelism = 
new HashMap<>();
+
+for (Tuple2 slotSharingGroup :
+sortSlotSharingGroupsByUpperParallelism(jobInformation)) {
+final int groupParallelism =
+Math.min(
+slotSharingGroup.f1,
+numUnassignedSlots / 
numUnassignedSlotSharingGroups);

Review Comment:
   Sorry for the meta comment but I'm looking at this from the perspective of a 
future Rescale API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mxm commented on a diff in pull request #21908: [FLINK-30895][coordination] Dynamically adjust slot distribution

2023-02-10 Thread via GitHub


mxm commented on code in PR #21908:
URL: https://github.com/apache/flink/pull/21908#discussion_r1103091640


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java:
##
@@ -133,7 +138,56 @@ public Optional 
determineParallelism(
 return Optional.of(new 
VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
 }
 
-private static Map determineParallelism(
+/**
+ * Distributes free slots across the slot-sharing groups of the job. Slots 
are distributed as
+ * evenly as possible while taking the minimum parallelism of contained 
vertices into account.
+ */
+private static Map 
determineSlotsPerSharingGroup(
+JobInformation jobInformation, int freeSlots) {
+int numUnassignedSlots = freeSlots;
+int numUnassignedSlotSharingGroups = 
jobInformation.getSlotSharingGroups().size();
+
+final Map slotSharingGroupParallelism = 
new HashMap<>();
+
+for (Tuple2 slotSharingGroup :
+sortSlotSharingGroupsByUpperParallelism(jobInformation)) {
+final int groupParallelism =
+Math.min(
+slotSharingGroup.f1,
+numUnassignedSlots / 
numUnassignedSlotSharingGroups);

Review Comment:
   That is terrible from an autoscaler perspective, as we want full control 
over the scaling. We never want the adaptive scheduler to reduce the 
parallelism in any way. Instead, we provide the spec and the scheduler has to 
rescale the job safely. It either fulfils the rescaling request, or it does 
nothing. We do not have unlimited retries for rescaling. Rescaling is costly 
especially for stateful workloads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink

2023-02-10 Thread via GitHub


reta commented on code in PR #5:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/5#discussion_r1103088811


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncWriter.java:
##
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.conn.ssl.TrustAllStrategy;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.ssl.SSLContexts;
+import org.opensearch.OpenSearchException;
+import org.opensearch.action.ActionListener;
+import org.opensearch.action.bulk.BulkItemResponse;
+import org.opensearch.action.bulk.BulkRequest;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Apache Flink's Async Sink Writer to insert or update data in an Opensearch 
index (see please
+ * {@link OpensearchAsyncSink}).
+ *
+ * @param  type of the records converted to Opensearch actions 
(instances of {@link
+ * DocSerdeRequest})
+ */
+@Internal
+class OpensearchAsyncWriter extends AsyncSinkWriter> {
+private static final Logger LOG = 
LoggerFactory.getLogger(OpensearchAsyncWriter.class);
+
+private final RestHighLevelClient client;
+private final Counter numRecordsOutErrorsCounter;
+private volatile boolean closed = false;
+
+private static final FatalExceptionClassifier 
OPENSEARCH_FATAL_EXCEPTION_CLASSIFIER =
+FatalExceptionClassifier.createChain(
+new FatalExceptionClassifier(
+err ->
+err instanceof NoRouteToHostException
+|| err instanceof ConnectException,
+err ->
+new OpenSearchException(
+"Could not connect to Opensearch 
cluster using provided hosts",
+err)));
+
+/**
+ * Constructor creating an Opensearch async writer.
+ *
+ * @param context the initialization context
+ * @param elementConverter converting incoming records to Opensearch write 
document requests
+ * @param maxBatchSize the maximum size of a batch of entries that may be 
sent
+ * @param maxInFlightRequests he maximum number of in flight requests that 
may exist, if any
+ * more in flight requests need to be initiated once the maximum has 
been reached, then it
+ * will be blocked until some have completed
+ * @param maxBufferedRequests the maximum number of elements held in the 
buffer, requests to add
+ * elements will be 

[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink

2023-02-10 Thread via GitHub


reta commented on code in PR #5:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/5#discussion_r1103068913


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java:
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.action.update.UpdateRequest;
+import org.opensearch.common.bytes.BytesReference;
+import org.opensearch.common.io.stream.BytesStreamOutput;
+import org.opensearch.common.io.stream.InputStreamStreamInput;
+import org.opensearch.common.io.stream.StreamInput;
+import org.opensearch.common.io.stream.StreamOutput;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Wrapper class around {@link DocWriteRequest} since it does not implement 
{@link Serializable},
+ * required by AsyncSink scaffolding.
+ *
+ * @param  type of the write request
+ */
+@PublicEvolving
+public class DocSerdeRequest implements Serializable {

Review Comment:
   ~This is my bad, the `T` must be constrained, I will fix it~ Removed `T`, 
not necessary indeed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #529: [FLINK-30776] Move autoscaler code into a separate Maven module

2023-02-10 Thread via GitHub


mxm commented on code in PR #529:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/529#discussion_r1103083946


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java:
##
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import 
org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
+import 
org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+
+import com.google.auto.service.AutoService;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+/**
+ * Factory for loading JobAutoScalerImpl included in this module. This class 
will be dynamically
+ * instantiated by the main operator module.
+ */
+@AutoService(JobAutoScalerFactory.class)

Review Comment:
   This is still required. All plugins have to declare a META-INF/service file 
which this annotation and the auto-service plugin does automatically.



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -468,6 +468,16 @@ protected void setOwnerReference(CR owner, Configuration 
deployConfig) {
 KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE, 
List.of(ownerReference));
 }
 
+private JobAutoScaler loadJobAutoscaler() {
+for (JobAutoScalerFactory factory : 
ServiceLoader.load(JobAutoScalerFactory.class)) {
+LOG.info("Loading JobAutoScaler implementation: {}", 
factory.getClass().getName());
+return factory.create(kubernetesClient, eventRecorder);
+}
+LOG.info("No JobAutoscaler implementation found. Autoscaling is 
disabled.");
+return new NoopJobAutoscaler();

Review Comment:
   See the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] mxm commented on a diff in pull request #529: [FLINK-30776] Move autoscaler code into a separate Maven module

2023-02-10 Thread via GitHub


mxm commented on code in PR #529:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/529#discussion_r1103083946


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java:
##
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import 
org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
+import 
org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+
+import com.google.auto.service.AutoService;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+/**
+ * Factory for loading JobAutoScalerImpl included in this module. This class 
will be dynamically
+ * instantiated by the main operator module.
+ */
+@AutoService(JobAutoScalerFactory.class)

Review Comment:
   This is still required. All plugins have to declare a META-INF/service file 
which this annotation does automatically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink

2023-02-10 Thread via GitHub


reta commented on code in PR #5:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/5#discussion_r1103068913


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java:
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.action.update.UpdateRequest;
+import org.opensearch.common.bytes.BytesReference;
+import org.opensearch.common.io.stream.BytesStreamOutput;
+import org.opensearch.common.io.stream.InputStreamStreamInput;
+import org.opensearch.common.io.stream.StreamInput;
+import org.opensearch.common.io.stream.StreamOutput;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Wrapper class around {@link DocWriteRequest} since it does not implement 
{@link Serializable},
+ * required by AsyncSink scaffolding.
+ *
+ * @param  type of the write request
+ */
+@PublicEvolving
+public class DocSerdeRequest implements Serializable {

Review Comment:
   This is my bad, the `T` must be constrained, I will fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink

2023-02-10 Thread via GitHub


reta commented on code in PR #5:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/5#discussion_r1103066160


##
.github/workflows/push_pr.yml:
##
@@ -25,4 +25,4 @@ jobs:
   compile_and_test:
 uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
 with:
-  flink_version: 1.16.0
+  flink_version: 1.16.1

Review Comment:
   @dannycranmer https://github.com/apache/flink-connector-opensearch/pull/7



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-02-10 Thread via GitHub


echauchot commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1101701294


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitState.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import com.datastax.driver.core.ResultSet;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+
+/**
+ * Mutable {@link CassandraSplit} that keeps track of the reading process of 
the associated split.
+ */
+public class CassandraSplitState {
+private final CassandraSplit cassandraSplit;
+// Cassandra ResultSet is paginated, a new page is read only if all the 
records of the previous
+// one were consumed. fetch() can be interrupted so we use the resultSet 
to keep track of the
+// reading process.
+// It is null when reading has not started (before fetch is called on the 
split).
+@Nullable private ResultSet resultSet;

Review Comment:
   As discussed in the other comment ResultSet is just a handle so the status 
of the read will not be part of the checkpoint leading to a re-read of the 
already output data indeed. The only way is to manage the memory size of the 
split at the enumerator level and either output all the split or not a all. 
That way in case of interrupted fetch nothing will be output and the split 
could be read again from the beginning after recovery leading to no duplicates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-02-10 Thread via GitHub


echauchot commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1101601871


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java:
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class generates {@link CassandraSplit}s based on Cassandra cluster 
partitioner and Flink
+ * source parallelism.
+ */
+public final class SplitsGenerator {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SplitsGenerator.class);
+
+private final CassandraPartitioner partitioner;
+
+public SplitsGenerator(CassandraPartitioner partitioner) {
+this.partitioner = partitioner;
+}
+
+/**
+ * Split Cassandra tokens ring into {@link CassandraSplit}s containing 
each a range of the ring.
+ *
+ * @param numSplits requested number of splits
+ * @return list containing {@code numSplits} CassandraSplits.
+ */
+public List generateSplits(long numSplits) {
+if (numSplits == 1) {
+return Collections.singletonList(
+new CassandraSplit(partitioner.minToken(), 
partitioner.maxToken()));
+}
+List splits = new ArrayList<>();
+BigInteger splitSize =
+(partitioner.ringSize()).divide(new 
BigInteger(String.valueOf(numSplits)));
+
+BigInteger startToken, endToken = partitioner.minToken();
+for (int splitCount = 1; splitCount <= numSplits; splitCount++) {
+startToken = endToken;
+endToken = startToken.add(splitSize);
+if (splitCount == numSplits) {
+endToken = partitioner.maxToken();
+}
+splits.add(new CassandraSplit(startToken, endToken));
+}

Review Comment:
   :+1: for the general change but regarding `splitCount == numSplits` it is to 
make sure the last split in the list covers the max token for division 
rounding. If I do this outside of the loop and add a final split it will be 
very small. I think it is better to extend the previous one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-02-10 Thread via GitHub


echauchot commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1100252874


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java:
##
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplitState;
+import org.apache.flink.connector.cassandra.source.split.RingRange;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * {@link SplitReader} for Cassandra source. This class is responsible for 
fetching the records as
+ * {@link CassandraRow}. For that, it executes a range query (query that 
outputs records belonging
+ * to a {@link RingRange}) based on the user specified query. This class 
manages the Cassandra
+ * cluster and session.
+ */
+public class CassandraSplitReader implements SplitReader {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitReader.class);
+public static final String SELECT_REGEXP = "(?i)select .+ from 
(\\w+)\\.(\\w+).*;$";
+
+private final Cluster cluster;
+private final Session session;
+private final Set unprocessedSplits;
+private final AtomicBoolean wakeup = new AtomicBoolean(false);
+private final String query;
+
+public CassandraSplitReader(ClusterBuilder clusterBuilder, String query) {
+this.unprocessedSplits = new HashSet<>();
+this.query = query;
+cluster = clusterBuilder.getCluster();
+session = cluster.connect();

Review Comment:
   Yes I thought about that but the problem I had was when to close the 
session/cluster. If you prefer this solution, I can override 
`SourceReaderBase#close()` and close them there + create a factory to 
initialize cluster, session and mapper at the creation of SourceReader before 
the super(...) calll. Also, I find elegant the design of passing a map function 
to the emitter from the source reader. I'll do so



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-opensearch] reta opened a new pull request, #7: [hotfix] Update Apache Flink to 1.16.1

2023-02-10 Thread via GitHub


reta opened a new pull request, #7:
URL: https://github.com/apache/flink-connector-opensearch/pull/7

   Update Apache Flink to 1.16.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31018) SQL Client -j option does not load user jars to classpath.

2023-02-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687220#comment-17687220
 ] 

Martijn Visser commented on FLINK-31018:


Great to hear that fixes it, thanks for updating it!

> SQL Client -j option does not load user jars to classpath.
> --
>
> Key: FLINK-31018
> URL: https://issues.apache.org/jira/browse/FLINK-31018
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Krzysztof Chmielewski
>Priority: Minor
> Attachments: image-2023-02-10-15-53-39-330.png, 
> image-2023-02-10-15-54-32-537.png, image-2023-02-10-16-05-12-407.png
>
>
> SQL Client '-j' option does not load custom jars to classpath as it was for 
> example in Flink 1.15
> As a result Flink 1.16 SQL Client is not able to discover classes through 
> Flink's Factory discovery mechanism throwing an error like:
> {code:java}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Could not find any factories 
> that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in 
> the classpath.
> {code}
> The same Jar and sample job are working fine with Flink 1.15.
> Flink 1.15.2
> ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar
>  !image-2023-02-10-15-53-39-330.png! 
> Flink 1.16.1
> ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar
>  !image-2023-02-10-15-54-32-537.png! 
> ADD JAR command does not solve " Could not find any factories" issue although 
> jar seems to be added:
>  !image-2023-02-10-16-05-12-407.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31018) SQL Client -j option does not load user jars to classpath.

2023-02-10 Thread Krzysztof Chmielewski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687218#comment-17687218
 ] 

Krzysztof Chmielewski edited comment on FLINK-31018 at 2/10/23 5:39 PM:


[~martijnvisser] yes it seemt that this is the case.

I;ve used DynamicTableFactory.Context#getClassLoader instead 
Thread.currentThread().getContextClassLoader() as suggested in one of the 
comments and it seems that problem disappeared. 

Thanks,
Thicket can be closed.


was (Author: kristoffsc):
[~martijnvisser] yes it seemt that this is the case.

I;ve used `DynamicTableFactory.Context#getClassLoader` instead 
`Thread.currentThread().getContextClassLoader()` as suggested in one of the 
comments and it seems that problem disappeared. 

Thanks,
Thicket can be closed.

> SQL Client -j option does not load user jars to classpath.
> --
>
> Key: FLINK-31018
> URL: https://issues.apache.org/jira/browse/FLINK-31018
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Krzysztof Chmielewski
>Priority: Minor
> Attachments: image-2023-02-10-15-53-39-330.png, 
> image-2023-02-10-15-54-32-537.png, image-2023-02-10-16-05-12-407.png
>
>
> SQL Client '-j' option does not load custom jars to classpath as it was for 
> example in Flink 1.15
> As a result Flink 1.16 SQL Client is not able to discover classes through 
> Flink's Factory discovery mechanism throwing an error like:
> {code:java}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Could not find any factories 
> that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in 
> the classpath.
> {code}
> The same Jar and sample job are working fine with Flink 1.15.
> Flink 1.15.2
> ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar
>  !image-2023-02-10-15-53-39-330.png! 
> Flink 1.16.1
> ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar
>  !image-2023-02-10-15-54-32-537.png! 
> ADD JAR command does not solve " Could not find any factories" issue although 
> jar seems to be added:
>  !image-2023-02-10-16-05-12-407.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31018) SQL Client -j option does not load user jars to classpath.

2023-02-10 Thread Krzysztof Chmielewski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krzysztof Chmielewski closed FLINK-31018.
-
Resolution: Not A Bug

> SQL Client -j option does not load user jars to classpath.
> --
>
> Key: FLINK-31018
> URL: https://issues.apache.org/jira/browse/FLINK-31018
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Krzysztof Chmielewski
>Priority: Minor
> Attachments: image-2023-02-10-15-53-39-330.png, 
> image-2023-02-10-15-54-32-537.png, image-2023-02-10-16-05-12-407.png
>
>
> SQL Client '-j' option does not load custom jars to classpath as it was for 
> example in Flink 1.15
> As a result Flink 1.16 SQL Client is not able to discover classes through 
> Flink's Factory discovery mechanism throwing an error like:
> {code:java}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Could not find any factories 
> that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in 
> the classpath.
> {code}
> The same Jar and sample job are working fine with Flink 1.15.
> Flink 1.15.2
> ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar
>  !image-2023-02-10-15-53-39-330.png! 
> Flink 1.16.1
> ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar
>  !image-2023-02-10-15-54-32-537.png! 
> ADD JAR command does not solve " Could not find any factories" issue although 
> jar seems to be added:
>  !image-2023-02-10-16-05-12-407.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31018) SQL Client -j option does not load user jars to classpath.

2023-02-10 Thread Krzysztof Chmielewski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687218#comment-17687218
 ] 

Krzysztof Chmielewski commented on FLINK-31018:
---

[~martijnvisser] yes it seemt that this is the case.

I;ve used `DynamicTableFactory.Context#getClassLoader` instead 
`Thread.currentThread().getContextClassLoader()` as suggested in one of the 
comments and it seems that problem disappeared. 

Thanks,
Thicket can be closed.

> SQL Client -j option does not load user jars to classpath.
> --
>
> Key: FLINK-31018
> URL: https://issues.apache.org/jira/browse/FLINK-31018
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Krzysztof Chmielewski
>Priority: Minor
> Attachments: image-2023-02-10-15-53-39-330.png, 
> image-2023-02-10-15-54-32-537.png, image-2023-02-10-16-05-12-407.png
>
>
> SQL Client '-j' option does not load custom jars to classpath as it was for 
> example in Flink 1.15
> As a result Flink 1.16 SQL Client is not able to discover classes through 
> Flink's Factory discovery mechanism throwing an error like:
> {code:java}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Could not find any factories 
> that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in 
> the classpath.
> {code}
> The same Jar and sample job are working fine with Flink 1.15.
> Flink 1.15.2
> ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar
>  !image-2023-02-10-15-53-39-330.png! 
> Flink 1.16.1
> ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar
>  !image-2023-02-10-15-54-32-537.png! 
> ADD JAR command does not solve " Could not find any factories" issue although 
> jar seems to be added:
>  !image-2023-02-10-16-05-12-407.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-opensearch] dannycranmer commented on a diff in pull request #5: [FLINK-30488] OpenSearch implementation of Async Sink

2023-02-10 Thread via GitHub


dannycranmer commented on code in PR #5:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/5#discussion_r1102989699


##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DocSerdeRequest.java:
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.delete.DeleteRequest;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.action.update.UpdateRequest;
+import org.opensearch.common.bytes.BytesReference;
+import org.opensearch.common.io.stream.BytesStreamOutput;
+import org.opensearch.common.io.stream.InputStreamStreamInput;
+import org.opensearch.common.io.stream.StreamInput;
+import org.opensearch.common.io.stream.StreamOutput;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Wrapper class around {@link DocWriteRequest} since it does not implement 
{@link Serializable},
+ * required by AsyncSink scaffolding.
+ *
+ * @param  type of the write request
+ */
+@PublicEvolving
+public class DocSerdeRequest implements Serializable {

Review Comment:
   Ii think the class level generics are redundant here. We are using `` 
throughout. Consider changing `private final DocWriteRequest request;` to 
`private final DocWriteRequest request;` and removing class generics. This 
makes the Sink interface a bit messy `extends AsyncSinkBase>`



##
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchAsyncSinkBuilder.java:
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.DocWriteRequest;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Builder to construct an Opensearch compatible {@link OpensearchAsyncSink}.
+ *
+ * The following example shows the minimal setup to create a 
OpensearchAsyncSink that submits
+ * actions with the default number of actions to buffer (1000).
+ *
+ * {@code
+ * OpensearchAsyncSink> sink = OpensearchAsyncSink
+ * .>builder()
+ * .setHosts(new HttpHost("localhost:9200")
+ * .setElementConverter((element, context) ->
+ * new 
IndexRequest("my-index").id(element.f0.toString()).source(element.f1));
+ * .build();
+ * }
+ *
+ * @param  type of the records converted to Opensearch actions
+ */
+@PublicEvolving
+public class OpensearchAsyncSinkBuilder
+extends AsyncSinkBaseBuilder<
+InputT, DocSerdeRequest, 
OpensearchAsyncSinkBuilder> {
+private List hosts;
+private String username;
+private String password;
+private String connectionPathPrefix;
+private Integer connectionTimeout;
+private Integer connectionRequestTimeout;
+private Integer socketTimeout;
+private Boolean 

[GitHub] [flink] wanglijie95 commented on pull request #21813: [hotfix] Fix typo in elastic_scaling.md

2023-02-10 Thread via GitHub


wanglijie95 commented on PR #21813:
URL: https://github.com/apache/flink/pull/21813#issuecomment-1426056540

   Thanks @noahshpak @reswqa, merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect

2023-02-10 Thread weiqinpan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687149#comment-17687149
 ] 

weiqinpan edited comment on FLINK-31003 at 2/10/23 4:20 PM:


The value of marketing_flow_id is not null, but the result  of below sql  is  
empty. So unbelievable.
{code:java}
SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM source;
{code}


was (Author: JIRAUSER298918):
The field of marketing_flow_id is not null, but the result  of below sql  is  
empty. So unbelievable.
{code:java}
SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM source;
{code}

> Flink SQL IF / CASE WHEN Funcation incorrect
> 
>
> Key: FLINK-31003
> URL: https://issues.apache.org/jira/browse/FLINK-31003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1
>Reporter: weiqinpan
>Priority: Major
>
> When I execute the below sql using sql-client,i found something wrong.
>  
> {code:java}
> CREATE TEMPORARY TABLE source (
>   mktgmsg_biz_type STRING,
>   marketing_flow_id STRING,
>   mktgmsg_campaign_id STRING
> )
> WITH
> (
>   'connector' = 'filesystem',
>   'path' = 'file:///Users/xxx/Desktop/demo.json',
>   'format' = 'json'
> ); 
> -- return correct value('marketing_flow_id') 
> SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM 
> source;
> -- return incorrect value('')
> SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM 
> source;{code}
> The demo.json data is 
>  
> {code:java}
> {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": 
> "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code}
>  
>  
> BTW, use case when + if / ifnull also have something wrong.
>  
> {code:java}
> -- return wrong value(''), expect return marketing_flow_id
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN IF(`marketing_flow_id` 
> IS NULL, `marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`)
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return wrong value('')
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return correct value, the difference is [else return ' ']
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ' '
>   END AS `message_campaign_instance_id` FROM source;
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wanglijie95 merged pull request #21813: [hotfix] Fix typo in elastic_scaling.md

2023-02-10 Thread via GitHub


wanglijie95 merged PR #21813:
URL: https://github.com/apache/flink/pull/21813


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect

2023-02-10 Thread weiqinpan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687149#comment-17687149
 ] 

weiqinpan commented on FLINK-31003:
---

The field of marketing_flow_id is not null, but the result  of below sql  is  
empty. So unbelievable.
{code:java}
SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM source;
{code}

> Flink SQL IF / CASE WHEN Funcation incorrect
> 
>
> Key: FLINK-31003
> URL: https://issues.apache.org/jira/browse/FLINK-31003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1
>Reporter: weiqinpan
>Priority: Major
>
> When I execute the below sql using sql-client,i found something wrong.
>  
> {code:java}
> CREATE TEMPORARY TABLE source (
>   mktgmsg_biz_type STRING,
>   marketing_flow_id STRING,
>   mktgmsg_campaign_id STRING
> )
> WITH
> (
>   'connector' = 'filesystem',
>   'path' = 'file:///Users/xxx/Desktop/demo.json',
>   'format' = 'json'
> ); 
> -- return correct value('marketing_flow_id') 
> SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM 
> source;
> -- return incorrect value('')
> SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM 
> source;{code}
> The demo.json data is 
>  
> {code:java}
> {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": 
> "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code}
>  
>  
> BTW, use case when + if / ifnull also have something wrong.
>  
> {code:java}
> -- return wrong value(''), expect return marketing_flow_id
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN IF(`marketing_flow_id` 
> IS NULL, `marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`)
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return wrong value('')
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return correct value, the difference is [else return ' ']
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ' '
>   END AS `message_campaign_instance_id` FROM source;
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zekai-li commented on pull request #21675: [FLINK-30690][javadocs][spelling] Fix java documentation and some wor…

2023-02-10 Thread via GitHub


zekai-li commented on PR #21675:
URL: https://github.com/apache/flink/pull/21675#issuecomment-1426034122

   
![image](https://user-images.githubusercontent.com/58294989/218140853-182a708b-408a-497f-8765-9e1b8fa09d2b.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zekai-li commented on pull request #21675: [FLINK-30690][javadocs][spelling] Fix java documentation and some wor…

2023-02-10 Thread via GitHub


zekai-li commented on PR #21675:
URL: https://github.com/apache/flink/pull/21675#issuecomment-1426032058

   > The compile build failed. Could you fix it first?
   The compilation error has been fixed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29825) Improve benchmark stability

2023-02-10 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687143#comment-17687143
 ] 

Piotr Nowojski commented on FLINK-29825:


Yes, that's a good idea :)

> Improve benchmark stability
> ---
>
> Key: FLINK-29825
> URL: https://issues.apache.org/jira/browse/FLINK-29825
> Project: Flink
>  Issue Type: Improvement
>  Components: Benchmarks
>Affects Versions: 1.17.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Minor
>
> Currently, regressions are detected by a simple script which may have false 
> positives and false negatives, especially for benchmarks with small absolute 
> values, small value changes would cause large percentage changes. see 
> [here|https://github.com/apache/flink-benchmarks/blob/master/regression_report.py#L132-L136]
>  for details.
> And all benchmarks are executed on one physical machine, it might happen that 
> hardware issues affect performance, like "[FLINK-18614] Performance 
> regression 2020.07.13".
>  
> This ticket aims to improve the precision and recall of the regression-check 
> script.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect

2023-02-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687120#comment-17687120
 ] 

Martijn Visser commented on FLINK-31003:


[~lincoln.86xy] WDYT?

> Flink SQL IF / CASE WHEN Funcation incorrect
> 
>
> Key: FLINK-31003
> URL: https://issues.apache.org/jira/browse/FLINK-31003
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.15.0, 1.15.1, 1.16.0, 1.15.2, 1.15.3, 1.16.1
>Reporter: weiqinpan
>Priority: Major
>
> When I execute the below sql using sql-client,i found something wrong.
>  
> {code:java}
> CREATE TEMPORARY TABLE source (
>   mktgmsg_biz_type STRING,
>   marketing_flow_id STRING,
>   mktgmsg_campaign_id STRING
> )
> WITH
> (
>   'connector' = 'filesystem',
>   'path' = 'file:///Users/xxx/Desktop/demo.json',
>   'format' = 'json'
> ); 
> -- return correct value('marketing_flow_id') 
> SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM 
> source;
> -- return incorrect value('')
> SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM 
> source;{code}
> The demo.json data is 
>  
> {code:java}
> {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": 
> "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code}
>  
>  
> BTW, use case when + if / ifnull also have something wrong.
>  
> {code:java}
> -- return wrong value(''), expect return marketing_flow_id
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN IF(`marketing_flow_id` 
> IS NULL, `marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`)
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return wrong value('')
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ''
>   END AS `message_campaign_instance_id` FROM source;
> -- return correct value, the difference is [else return ' ']
> select CASE
>   WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
> IFNULL(`marketing_flow_id`, '')
>   WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
> IFNULL(`mktgmsg_campaign_id`, '')
>   ELSE ' '
>   END AS `message_campaign_instance_id` FROM source;
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31017) Early-started partial match timeout not yield completed matches

2023-02-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687118#comment-17687118
 ] 

Martijn Visser commented on FLINK-31017:


[~Juntao Hu] Are you committing to fixing this in 1.18? Else I'll remove the 
fixVersion for now, because that should only be set if indeed it's expected 
that it will make that version

> Early-started partial match timeout not yield completed matches
> ---
>
> Key: FLINK-31017
> URL: https://issues.apache.org/jira/browse/FLINK-31017
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.17.0, 1.15.3, 1.16.1
>Reporter: Juntao Hu
>Priority: Major
> Fix For: 1.18.0
>
>
> Pattern example:
> {code:java}
> Pattern.begin("A").where(startsWith("a")).oneOrMore().consecutive().greedy()
> .followedBy("B")
> .where(count("A") > 2 ? startsWith("b") : startsWith("c"))
> .within(Time.seconds(3));{code}
> Sequence example, currently without any output:
> a1 a2 a3 a4 c1
> When match[a3, a4, c1] completes, partial match[a1, a2, a3, a4] is earlier, 
> so NFA#processMatchesAccordingToSkipStrategy() won't give any result, which 
> is the expected behavior. However, when partial match[a1, a2, a3, a4] is 
> timed-out, completed match[a3, a4, c1] should be "freed" from NFAState to 
> output.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31018) SQL Client -j option does not load user jars to classpath.

2023-02-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687117#comment-17687117
 ] 

Martijn Visser commented on FLINK-31018:


[~KristoffSC] Could this be related to FLINK-15635 ?

> SQL Client -j option does not load user jars to classpath.
> --
>
> Key: FLINK-31018
> URL: https://issues.apache.org/jira/browse/FLINK-31018
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0, 1.16.1
>Reporter: Krzysztof Chmielewski
>Priority: Minor
> Attachments: image-2023-02-10-15-53-39-330.png, 
> image-2023-02-10-15-54-32-537.png, image-2023-02-10-16-05-12-407.png
>
>
> SQL Client '-j' option does not load custom jars to classpath as it was for 
> example in Flink 1.15
> As a result Flink 1.16 SQL Client is not able to discover classes through 
> Flink's Factory discovery mechanism throwing an error like:
> {code:java}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Could not find any factories 
> that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in 
> the classpath.
> {code}
> The same Jar and sample job are working fine with Flink 1.15.
> Flink 1.15.2
> ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar
>  !image-2023-02-10-15-53-39-330.png! 
> Flink 1.16.1
> ./bin/sql-client.sh -j flink-http-connector-0.9.0.jar
>  !image-2023-02-10-15-54-32-537.png! 
> ADD JAR command does not solve " Could not find any factories" issue although 
> jar seems to be added:
>  !image-2023-02-10-16-05-12-407.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29825) Improve benchmark stability

2023-02-10 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687116#comment-17687116
 ] 

Dong Lin commented on FLINK-29825:
--

Thanks [~Yanfei Lei] for the detailed evaluation results! Maybe we can write a 
blog together based on your evaluation results.

> Improve benchmark stability
> ---
>
> Key: FLINK-29825
> URL: https://issues.apache.org/jira/browse/FLINK-29825
> Project: Flink
>  Issue Type: Improvement
>  Components: Benchmarks
>Affects Versions: 1.17.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Minor
>
> Currently, regressions are detected by a simple script which may have false 
> positives and false negatives, especially for benchmarks with small absolute 
> values, small value changes would cause large percentage changes. see 
> [here|https://github.com/apache/flink-benchmarks/blob/master/regression_report.py#L132-L136]
>  for details.
> And all benchmarks are executed on one physical machine, it might happen that 
> hardware issues affect performance, like "[FLINK-18614] Performance 
> regression 2020.07.13".
>  
> This ticket aims to improve the precision and recall of the regression-check 
> script.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31018) SQL Client -j option does not load user jars to classpath.

2023-02-10 Thread Krzysztof Chmielewski (Jira)
Krzysztof Chmielewski created FLINK-31018:
-

 Summary: SQL Client -j option does not load user jars to classpath.
 Key: FLINK-31018
 URL: https://issues.apache.org/jira/browse/FLINK-31018
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.16.1, 1.17.0
Reporter: Krzysztof Chmielewski
 Attachments: image-2023-02-10-15-53-39-330.png, 
image-2023-02-10-15-54-32-537.png, image-2023-02-10-16-05-12-407.png

SQL Client '-j' option does not load custom jars to classpath as it was for 
example in Flink 1.15
As a result Flink 1.16 SQL Client is not able to discover classes through 
Flink's Factory discovery mechanism throwing an error like:

{code:java}
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factories 
that implement 'com.getindata.connectors.http.LookupQueryCreatorFactory' in the 
classpath.
{code}

The same Jar and sample job are working fine with Flink 1.15.

Flink 1.15.2
./bin/sql-client.sh -j flink-http-connector-0.9.0.jar
 !image-2023-02-10-15-53-39-330.png! 

Flink 1.16.1
./bin/sql-client.sh -j flink-http-connector-0.9.0.jar
 !image-2023-02-10-15-54-32-537.png! 

ADD JAR command does not solve " Could not find any factories" issue although 
jar seems to be added:
 !image-2023-02-10-16-05-12-407.png! 




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-02-10 Thread Leonid Ilyevsky (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687111#comment-17687111
 ] 

Leonid Ilyevsky commented on FLINK-30998:
-

This morning I successfully tested my solution. The changes overall are pretty 
minor.

Please let's discuss so we can implement it.

> Add optional exception handler to flink-connector-opensearch
> 
>
> Key: FLINK-30998
> URL: https://issues.apache.org/jira/browse/FLINK-30998
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: 1.16.1
>Reporter: Leonid Ilyevsky
>Priority: Major
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-28911) Elasticsearch connector fails build

2023-02-10 Thread Niels Basjes (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niels Basjes closed FLINK-28911.

Resolution: Won't Fix

> Elasticsearch connector fails build
> ---
>
> Key: FLINK-28911
> URL: https://issues.apache.org/jira/browse/FLINK-28911
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.15.1
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Major
>  Labels: pull-request-available
>
> When I run the `mvn clean verify` of the ES connector some if the integration 
> tests fail.
> Assesment so far: the SerializationSchema is not opened, triggering an NPE 
> later on.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-18235) Improve the checkpoint strategy for Python UDF execution

2023-02-10 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-18235:
--

Assignee: (was: Dian Fu)

> Improve the checkpoint strategy for Python UDF execution
> 
>
> Key: FLINK-18235
> URL: https://issues.apache.org/jira/browse/FLINK-18235
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Dian Fu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, stale-assigned
>
> Currently, when a checkpoint is triggered for the Python operator, all the 
> data buffered will be flushed to the Python worker to be processed. This will 
> increase the overall checkpoint time in case there are a lot of elements 
> buffered and Python UDF is slow. We should improve the checkpoint strategy to 
> improve this. One way to implement this is to control the number of data 
> buffered in the pipeline between Java/Python processes, similar to what 
> [FLIP-183|https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment]
>  does to control the number of data buffered in the network. We can also let 
> users to config the checkpoint strategy if needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-18235) Improve the checkpoint strategy for Python UDF execution

2023-02-10 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-18235:
---
Priority: Not a Priority  (was: Major)

> Improve the checkpoint strategy for Python UDF execution
> 
>
> Key: FLINK-18235
> URL: https://issues.apache.org/jira/browse/FLINK-18235
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, stale-assigned
>
> Currently, when a checkpoint is triggered for the Python operator, all the 
> data buffered will be flushed to the Python worker to be processed. This will 
> increase the overall checkpoint time in case there are a lot of elements 
> buffered and Python UDF is slow. We should improve the checkpoint strategy to 
> improve this. One way to implement this is to control the number of data 
> buffered in the pipeline between Java/Python processes, similar to what 
> [FLIP-183|https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment]
>  does to control the number of data buffered in the network. We can also let 
> users to config the checkpoint strategy if needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-18235) Improve the checkpoint strategy for Python UDF execution

2023-02-10 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687102#comment-17687102
 ] 

Piotr Nowojski commented on FLINK-18235:


[~dianfu], may I ask if you have considered implementing a snapshot strategy 
similar to one in the {{AsyncWaitOperator}}? Namely, 
# After serializing incoming records, keep them buffered in the Flink's 
operator memory (not on the state yet!)
# If record has been successfully processed, remove it from the buffer.
# If checkpoint happens ({{snapshotState}} call), just copy in-flight records 
from the buffer, to a {{ListState}} - no need to flush or wait for the 
in-flight records to finish processing. 
# During recovery, re process the records from the recovered {{ListState}}. 

> Improve the checkpoint strategy for Python UDF execution
> 
>
> Key: FLINK-18235
> URL: https://issues.apache.org/jira/browse/FLINK-18235
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: auto-deprioritized-major, stale-assigned
>
> Currently, when a checkpoint is triggered for the Python operator, all the 
> data buffered will be flushed to the Python worker to be processed. This will 
> increase the overall checkpoint time in case there are a lot of elements 
> buffered and Python UDF is slow. We should improve the checkpoint strategy to 
> improve this. One way to implement this is to control the number of data 
> buffered in the pipeline between Java/Python processes, similar to what 
> [FLIP-183|https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment]
>  does to control the number of data buffered in the network. We can also let 
> users to config the checkpoint strategy if needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29825) Improve benchmark stability

2023-02-10 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687098#comment-17687098
 ] 

Piotr Nowojski commented on FLINK-29825:


Thanks a lot for the very detailed comparison [~Yanfei Lei]. Let's go with the 
[~lindong]'s proposal!

> Improve benchmark stability
> ---
>
> Key: FLINK-29825
> URL: https://issues.apache.org/jira/browse/FLINK-29825
> Project: Flink
>  Issue Type: Improvement
>  Components: Benchmarks
>Affects Versions: 1.17.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Minor
>
> Currently, regressions are detected by a simple script which may have false 
> positives and false negatives, especially for benchmarks with small absolute 
> values, small value changes would cause large percentage changes. see 
> [here|https://github.com/apache/flink-benchmarks/blob/master/regression_report.py#L132-L136]
>  for details.
> And all benchmarks are executed on one physical machine, it might happen that 
> hardware issues affect performance, like "[FLINK-18614] Performance 
> regression 2020.07.13".
>  
> This ticket aims to improve the precision and recall of the regression-check 
> script.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp commented on a diff in pull request #19970: [FLINK-27970][tests][JUnit5 migration] flink-hadoop-bulk

2023-02-10 Thread via GitHub


XComp commented on code in PR #19970:
URL: https://github.com/apache/flink/pull/19970#discussion_r1102841851


##
flink-formats/flink-hadoop-bulk/archunit-violations/db4de53e-d09e-4fb0-bdbc-429c1b64686f:
##
@@ -0,0 +1,12 @@
+org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameCommitterHDFSITCase 
does not satisfy: only one of the following predicates match:\

Review Comment:
   Why is this added? It looks like there is some problem with the archunit 
tests and subclasses? :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-31017) Early-started partial match timeout not yield completed matches

2023-02-10 Thread Juntao Hu (Jira)
Juntao Hu created FLINK-31017:
-

 Summary: Early-started partial match timeout not yield completed 
matches
 Key: FLINK-31017
 URL: https://issues.apache.org/jira/browse/FLINK-31017
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.16.1, 1.15.3, 1.17.0
Reporter: Juntao Hu
 Fix For: 1.18.0


Pattern example:
{code:java}
Pattern.begin("A").where(startsWith("a")).oneOrMore().consecutive().greedy()
.followedBy("B")
.where(count("A") > 2 ? startsWith("b") : startsWith("c"))
.within(Time.seconds(3));{code}
Sequence example, currently without any output:

a1 a2 a3 a4 c1

When match[a3, a4, c1] completes, partial match[a1, a2, a3, a4] is earlier, so 
NFA#processMatchesAccordingToSkipStrategy() won't give any result, which is the 
expected behavior. However, when partial match[a1, a2, a3, a4] is timed-out, 
completed match[a3, a4, c1] should be "freed" from NFAState to output.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23016) Job client must be a Coordination Request Gateway when submit a job on web ui

2023-02-10 Thread Krzysztof Chmielewski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687090#comment-17687090
 ] 

Krzysztof Chmielewski commented on FLINK-23016:
---

FYI, 
got the same error on Flink 1.16.1

> Job client must be a Coordination Request Gateway when submit a job on web ui 
> --
>
> Key: FLINK-23016
> URL: https://issues.apache.org/jira/browse/FLINK-23016
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.13.1
> Environment: flink: 1.13.1
> flink-cdc: com.alibaba.ververica:flink-connector-postgres-cdc:1.4.0
> jdk:1.8
>Reporter: wen qi
>Priority: Not a Priority
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> auto-deprioritized-minor
> Attachments: WechatIMG10.png, WechatIMG11.png, WechatIMG8.png
>
>
> I used flink cdc to collect data,and use table api to  transfer data  and 
> write to another table.
> That's all ritht when I run code in IDE and submit jar of jobs use cli, but 
> web ui
> When I use StreamTableEnvironment.from('table-path').execute(), it's failed! 
> please check my attachments , it seems that a  bug of web ui bug ? 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] wanglijie95 commented on pull request #21813: [hotfix] Fix typo in elastic_scaling.md

2023-02-10 Thread via GitHub


wanglijie95 commented on PR #21813:
URL: https://github.com/apache/flink/pull/21813#issuecomment-1425854545

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29825) Improve benchmark stability

2023-02-10 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17687082#comment-17687082
 ] 

Yanfei Lei commented on FLINK-29825:


[~pnowojski]  I tried to use hunter to detect regression, and 
[here|https://docs.google.com/document/d/1coI4eJsauBtrlS1Z77bhGf-hNtDEXbzuwacG5ZPCMc8/edit?usp=sharing]
 are some evaluation results of the three algorithms. I'm not sure I fully 
understand the usage of hunter, it looks like hunter can only detect 
regressions in the history sequence, I modified it a little bit to detect 
regressions in the latest commit, correct me if something is wrong in the 
document:D.

> Improve benchmark stability
> ---
>
> Key: FLINK-29825
> URL: https://issues.apache.org/jira/browse/FLINK-29825
> Project: Flink
>  Issue Type: Improvement
>  Components: Benchmarks
>Affects Versions: 1.17.0
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Minor
>
> Currently, regressions are detected by a simple script which may have false 
> positives and false negatives, especially for benchmarks with small absolute 
> values, small value changes would cause large percentage changes. see 
> [here|https://github.com/apache/flink-benchmarks/blob/master/regression_report.py#L132-L136]
>  for details.
> And all benchmarks are executed on one physical machine, it might happen that 
> hardware issues affect performance, like "[FLINK-18614] Performance 
> regression 2020.07.13".
>  
> This ticket aims to improve the precision and recall of the regression-check 
> script.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >