[jira] [Created] (FLINK-29647) report stackoverflow when using kryo

2022-10-14 Thread Gao Fei (Jira)
Gao Fei created FLINK-29647:
---

 Summary: report stackoverflow when using kryo
 Key: FLINK-29647
 URL: https://issues.apache.org/jira/browse/FLINK-29647
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.13.2
 Environment: flink 1.13.2 version (kryo 2.24 version)
Reporter: Gao Fei


When using kryo to report stackoverflow, the error is as follows:
{code:java}
java.lang.StackOverflowError at 
com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:43) at 
com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:44) at 
com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:44) at 
com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:44) at 
com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:44) at 
com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:44) at 
com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:44) at 
com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:44) at 
com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:44)
{code}
 I am using two-phase commit to write data to mysql, the following is part of 
the mysql sink code:
{code:java}
public class MySqlTwoPhaseCommitSink extends 
TwoPhaseCommitSinkFunction, Connection,Void> {
    private static final Logger log = 
LoggerFactory.getLogger(MySqlTwoPhaseCommitSink.class);
    public MySqlTwoPhaseCommitSink(){
        super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), 
VoidSerializer.INSTANCE);
    }
    @Override
    public void invoke(Connection connection, Tuple2 tp, 
Context context) throws Exception {
        log.info("start invoke...");
        //TODO
        //omit here
    }
    @Override
    public Connection beginTransaction() throws Exception {
        log.info("start beginTransaction...");
        String url = 
"jdbc:mysql://localhost:3306/bigdata?useUnicode=true=UTF-8";
        Connection connection = DBConnectUtil.getConnection(url, "root", 
"123456");
        return connection;
    }
    @Override
    public void preCommit(Connection connection) throws Exception {
        log.info("start preCommit...");
    }
    @Override
    public void commit(Connection connection) {
        log.info("start commit...");
        DBConnectUtil.commit(connection);
    }
    @Override
    public void abort(Connection connection) {
        log.info("start abort rollback...");
        DBConnectUtil.rollback(connection);
    }
}{code}

I also found similar problem reports: 
https://github.com/EsotericSoftware/kryo/issues/341



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] sap1ens commented on a diff in pull request #403: [FLINK-29633] Pass fromSavepoint argument

2022-10-14 Thread GitBox


sap1ens commented on code in PR #403:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/403#discussion_r996211925


##
flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java:
##
@@ -85,7 +86,8 @@ public void testApplicationCommandAndArgsAdded() {
 containsInAnyOrder(
 CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG,
 "--allowNonRestoredState",
-"false",
+"--fromSavepoint",
+"/tmp/savepoint/path",

Review Comment:
   Because I'm testing the logic I added in the 
`CmdStandaloneJobManagerDecorator` file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] sap1ens commented on pull request #403: [FLINK-29633] Pass fromSavepoint argument

2022-10-14 Thread GitBox


sap1ens commented on PR #403:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/403#issuecomment-1279584573

   > I agree, it would be nice to have comprehensive tests that actually test 
the allowNonRestoredState logic. On the other hand if we simply enable that 
flag on existing tests at least we provide a regression test for this 
particular issue.
   > 
   > Because current test would have still failed before this fix if that flag 
is enabled for standalone.
   
   Not quite: `allowNonRestoredState` is pretty much ignored unless 
`fromSavepoint` is also provided. WIth or without the boolean: it doesn't 
matter. That's why I think we need a real savepoint recovery test, probably as 
a separate issue / PR.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #403: [FLINK-29633] Pass fromSavepoint argument

2022-10-14 Thread GitBox


gyfora commented on code in PR #403:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/403#discussion_r996141173


##
flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java:
##
@@ -85,7 +86,8 @@ public void testApplicationCommandAndArgsAdded() {
 containsInAnyOrder(
 CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG,
 "--allowNonRestoredState",
-"false",
+"--fromSavepoint",
+"/tmp/savepoint/path",

Review Comment:
   Why do we need to add this here? Seems like it worked before without it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #403: [FLINK-29633] Pass fromSavepoint argument

2022-10-14 Thread GitBox


gyfora commented on PR #403:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/403#issuecomment-127956

   > > We should think of a way to test these in a reliable way. Testing the 
parameters is not enough as might not be understood by the JM as you pointed 
out.
   > > Maybe we need some e2e tests for standalone mode that test these params 
properly
   > 
   > I agree, e2e testing seems to be the only reliable way to verify that it 
works.
   > 
   > I've checked the `e2e-tests` folder and it looks like it contains only 
fairly basic tests. To properly test this change we need to:
   > 
   > 1. Start a stateful application
   > 2. Take a savepoint
   > 3. Stop it
   > 4. Start another stateful application with a slightly different topology 
so `allowNonRestoredState` is required
   > 5. Confirm it's restored properly
   > 
   > Perhaps we can skip steps 1-3 if we're ok with keeping a savepoint file as 
a test resource. Also, step 4 is particularly tricky: is there a standard Flink 
example that we can use? Otherwise, we'd need to build a new app just for 
testing.
   > 
   > This feel like a new test file.
   > 
   > @gyfora thoughts?
   
   I agree, it would be nice to have comprehensive tests that actually test the 
allowNonRestoredState logic. On the other hand if we simply enable that flag on 
existing tests at least we provide a regression test for this particular issue.
   
   Because current test  would have still failed before this fix if that flag 
is enabled for standalone.
   
   I think we should do something simple for this ticket and open another jira 
for a complete test case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] sap1ens commented on pull request #403: [FLINK-29633] Pass fromSavepoint argument

2022-10-14 Thread GitBox


sap1ens commented on PR #403:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/403#issuecomment-1279307886

   > We should think of a way to test these in a reliable way. Testing the 
parameters is not enough as might not be understood by the JM as you pointed 
out.
   > 
   > Maybe we need some e2e tests for standalone mode that test these params 
properly
   
   I agree, e2e testing seems to be the only reliable way to verify that it 
works.
   
   I've checked the `e2e-tests` folder and it looks like it contains only 
fairly basic tests. To properly test this change we need to:
   
   1. Start a stateful application
   2. Take a savepoint
   3. Stop it
   4. Start another stateful application with a slightly different topology so 
`allowNonRestoredState` is required
   5. Confirm it's restored properly 
   
   Perhaps we can skip steps 1-3 if we're ok with keeping a savepoint file as a 
test resource. Also, step 4 is particularly tricky: is there a standard Flink 
example that we can use? Otherwise, we'd need to build a new app just for 
testing.
   
   This feel like a new test file.
   
   @gyfora thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-20350) Incompatible Connectors due to Guava conflict

2022-10-14 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617883#comment-17617883
 ] 

Danny Cranmer commented on FLINK-20350:
---

[~samrat007] yes please do, I will assign to you. Thanks!

> Incompatible Connectors due to Guava conflict
> -
>
> Key: FLINK-20350
> URL: https://issues.apache.org/jira/browse/FLINK-20350
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Google Cloud PubSub, Connectors / Kinesis
>Affects Versions: 1.11.1, 1.11.2
>Reporter: Danny Cranmer
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> *Problem*
> Kinesis and GCP PubSub connector do not work together. The following error is 
> thrown.
> {code}
> java.lang.NoClassDefFoundError: Could not initialize class 
> io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
>   at 
> org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:52)
>  ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:213)
>  ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:102)
>  ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  ~[flink-core-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> ~[flink-runtime_2.11-1.11.1.jar:1.11.1]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> ~[flink-runtime_2.11-1.11.1.jar:1.11.1]
>   at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
> {code}
> {code}
> 
> org.apache.flink
> 
> flink-connector-gcp-pubsub_${scala.binary.version}
> 1.11.1
> 
> 
>org.apache.flink
> flink-connector-kinesis_${scala.binary.version}
> 1.11.1
> 
> {code}
> *Cause*
> This is caused by a Guava dependency conflict:
> - Kinesis Consumer > {{18.0}}
> - GCP PubSub > {{26.0-android}}
> {{NettyChannelBuilder}} fails to initialise due to missing method in guava:
> - 
> {{com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;CLjava/lang/Object;)V}}
> *Possible Fixes*
> - Align Guava versions
> - Shade Guava in either connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-20350) Incompatible Connectors due to Guava conflict

2022-10-14 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-20350:
-

Assignee: Samrat Deb

> Incompatible Connectors due to Guava conflict
> -
>
> Key: FLINK-20350
> URL: https://issues.apache.org/jira/browse/FLINK-20350
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Google Cloud PubSub, Connectors / Kinesis
>Affects Versions: 1.11.1, 1.11.2
>Reporter: Danny Cranmer
>Assignee: Samrat Deb
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> *Problem*
> Kinesis and GCP PubSub connector do not work together. The following error is 
> thrown.
> {code}
> java.lang.NoClassDefFoundError: Could not initialize class 
> io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
>   at 
> org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:52)
>  ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:213)
>  ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:102)
>  ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  ~[flink-core-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> ~[flink-runtime_2.11-1.11.1.jar:1.11.1]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> ~[flink-runtime_2.11-1.11.1.jar:1.11.1]
>   at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
> {code}
> {code}
> 
> org.apache.flink
> 
> flink-connector-gcp-pubsub_${scala.binary.version}
> 1.11.1
> 
> 
>org.apache.flink
> flink-connector-kinesis_${scala.binary.version}
> 1.11.1
> 
> {code}
> *Cause*
> This is caused by a Guava dependency conflict:
> - Kinesis Consumer > {{18.0}}
> - GCP PubSub > {{26.0-android}}
> {{NettyChannelBuilder}} fails to initialise due to missing method in guava:
> - 
> {{com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;CLjava/lang/Object;)V}}
> *Possible Fixes*
> - Align Guava versions
> - Shade Guava in either connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-20350) Incompatible Connectors due to Guava conflict

2022-10-14 Thread Samrat Deb (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617877#comment-17617877
 ] 

Samrat Deb commented on FLINK-20350:


[~danny.cranmer] 

can i pick this task and try to fix it ?

 

> Incompatible Connectors due to Guava conflict
> -
>
> Key: FLINK-20350
> URL: https://issues.apache.org/jira/browse/FLINK-20350
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Google Cloud PubSub, Connectors / Kinesis
>Affects Versions: 1.11.1, 1.11.2
>Reporter: Danny Cranmer
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> *Problem*
> Kinesis and GCP PubSub connector do not work together. The following error is 
> thrown.
> {code}
> java.lang.NoClassDefFoundError: Could not initialize class 
> io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
>   at 
> org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:52)
>  ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:213)
>  ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:102)
>  ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  ~[flink-core-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> ~[flink-runtime_2.11-1.11.1.jar:1.11.1]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> ~[flink-runtime_2.11-1.11.1.jar:1.11.1]
>   at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
> {code}
> {code}
> 
> org.apache.flink
> 
> flink-connector-gcp-pubsub_${scala.binary.version}
> 1.11.1
> 
> 
>org.apache.flink
> flink-connector-kinesis_${scala.binary.version}
> 1.11.1
> 
> {code}
> *Cause*
> This is caused by a Guava dependency conflict:
> - Kinesis Consumer > {{18.0}}
> - GCP PubSub > {{26.0-android}}
> {{NettyChannelBuilder}} fails to initialise due to missing method in guava:
> - 
> {{com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;CLjava/lang/Object;)V}}
> *Possible Fixes*
> - Align Guava versions
> - Shade Guava in either connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Samrat002 commented on pull request #20937: [FLINK-29478][flink-connector/hive] Flink connector hive upgrade to 3.1.3

2022-10-14 Thread GitBox


Samrat002 commented on PR #20937:
URL: https://github.com/apache/flink/pull/20937#issuecomment-1279234327

   @dannycranmer 
   
   I removed `org.apache.hadoop:hadoop-mapreduce-client-core:3.1.0` from 
`NOTICE`.
   
   > Regarding "The bundled Apache Hive org.apache.hive:hive-exec dependency" 
section, I am not sure this is correct. I see we have dropped guava from the 
list, however I can see guava is bundled:
   
   i found this in 
[pom.xml](https://github.com/apache/flink/blob/master/flink-connectors/flink-sql-connector-hive-3.1.2/pom.xml#L167).
 Not sure if this is relevent 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-12010) Kinesis Producer problems on Alpine Linux image

2022-10-14 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617844#comment-17617844
 ] 

Danny Cranmer commented on FLINK-12010:
---

We introduced a new Sink in Flink 1.15 that does not use KPL. The old sink is 
now deprecated, therefore resolving this issue as wont do

> Kinesis Producer problems on Alpine Linux image
> ---
>
> Key: FLINK-12010
> URL: https://issues.apache.org/jira/browse/FLINK-12010
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.7.2
>Reporter:  Mario Georgiev
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> {code:java}
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  sys_siglist: symbol not found
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  backtrace_symbols_fd: symbol not found
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  backtrace: symbol not found
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  __strtok_r
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  __rawmemchr: symbol not found
> {code}
> When building flink from source and using alpine linux, it appears that 
> KinesisProducer does not really like Alpine Linux. Any ideas? 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-12010) Kinesis Producer problems on Alpine Linux image

2022-10-14 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-12010.
---
Resolution: Won't Do

> Kinesis Producer problems on Alpine Linux image
> ---
>
> Key: FLINK-12010
> URL: https://issues.apache.org/jira/browse/FLINK-12010
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.7.2
>Reporter:  Mario Georgiev
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> {code:java}
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  sys_siglist: symbol not found
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  backtrace_symbols_fd: symbol not found
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  backtrace: symbol not found
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  __strtok_r
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  __rawmemchr: symbol not found
> {code}
> When building flink from source and using alpine linux, it appears that 
> KinesisProducer does not really like Alpine Linux. Any ideas? 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-20350) Incompatible Connectors due to Guava conflict

2022-10-14 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-20350:
--
Summary: Incompatible Connectors due to Guava conflict  (was: [Kinesis][GCP 
PubSub] Incompatible Connectors due to Guava conflict)

> Incompatible Connectors due to Guava conflict
> -
>
> Key: FLINK-20350
> URL: https://issues.apache.org/jira/browse/FLINK-20350
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Google Cloud PubSub, Connectors / Kinesis
>Affects Versions: 1.11.1, 1.11.2
>Reporter: Danny Cranmer
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> *Problem*
> Kinesis and GCP PubSub connector do not work together. The following error is 
> thrown.
> {code}
> java.lang.NoClassDefFoundError: Could not initialize class 
> io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
>   at 
> org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:52)
>  ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:213)
>  ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:102)
>  ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  ~[flink-core-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>  ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> ~[flink-runtime_2.11-1.11.1.jar:1.11.1]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> ~[flink-runtime_2.11-1.11.1.jar:1.11.1]
>   at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
> {code}
> {code}
> 
> org.apache.flink
> 
> flink-connector-gcp-pubsub_${scala.binary.version}
> 1.11.1
> 
> 
>org.apache.flink
> flink-connector-kinesis_${scala.binary.version}
> 1.11.1
> 
> {code}
> *Cause*
> This is caused by a Guava dependency conflict:
> - Kinesis Consumer > {{18.0}}
> - GCP PubSub > {{26.0-android}}
> {{NettyChannelBuilder}} fails to initialise due to missing method in guava:
> - 
> {{com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;CLjava/lang/Object;)V}}
> *Possible Fixes*
> - Align Guava versions
> - Shade Guava in either connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-21830) Add support for Session Token with Basic Auth

2022-10-14 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-21830.
---
Resolution: Won't Do

This does not make sense, since session tokens will expire. Resolving as won't 
do as there has not been any demand for this feature

> Add support for Session Token with Basic Auth
> -
>
> Key: FLINK-21830
> URL: https://issues.apache.org/jira/browse/FLINK-21830
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Reporter: Danny Cranmer
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned
>
> *Background*
> The {{FlinkKinesisConsumer}} and {{FlinkKinesisProducer}} support a variety 
> of AWS authentication mechanisms, including {{BASIC}}. When using {{BASIC}}, 
> {{SESSION_KEY}} is not supported. 
> *Scope*
> Add support for {{SESSION_KEY}} with {{BASIC}} when constructing the AWS 
> credential provider.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-21228) Deadlock in KinesisProducer

2022-10-14 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-21228:
--
Summary: Deadlock in KinesisProducer  (was: [Kinesis][Producer] Deadlock in 
KinesisProducer)

> Deadlock in KinesisProducer
> ---
>
> Key: FLINK-21228
> URL: https://issues.apache.org/jira/browse/FLINK-21228
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.12.1
>Reporter: Danny Cranmer
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned
>
> *Background*
>  Kinesis sink failed and resulted in deadlock:
>  - Indefinite backpressure being applied
>  - Exception never thrown causing job to fail
> Application running with:
> {code:java}
> flinkKinesisProducer.setQueueLimit(1);
> flinkKinesisProducer.setFailOnError(true); 
> {code}
>  - {{KinesisProducer}} is waiting for queue to empty before sending the next 
> record 
> ([code|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java#L303])
>  - KPL ran out of memory, which raised an error, however this is processed 
> async 
> ([code|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java#L275])
>  - {{KinesisProducer}} would have rethrown the error and restarted the job, 
> however operator stuck in an infinite loop enforcing the queue limit (which 
> never clears) 
> ([code|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java#L306])
> *Proposal*
>  - {{checkAndPropagateAsyncError()}} while enforcing queue limit in 
> {{enforceQueueLimit()}} to break deadlock



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-21830) Add support for Session Token with Basic Auth

2022-10-14 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-21830:
--
Summary: Add support for Session Token with Basic Auth  (was: 
[kinesis][auth] Add support for Session Token)

> Add support for Session Token with Basic Auth
> -
>
> Key: FLINK-21830
> URL: https://issues.apache.org/jira/browse/FLINK-21830
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Reporter: Danny Cranmer
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned
>
> *Background*
> The {{FlinkKinesisConsumer}} and {{FlinkKinesisProducer}} support a variety 
> of AWS authentication mechanisms, including {{BASIC}}. When using {{BASIC}}, 
> {{SESSION_KEY}} is not supported. 
> *Scope*
> Add support for {{SESSION_KEY}} with {{BASIC}} when constructing the AWS 
> credential provider.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-24549) FlinkKinesisConsumer does not work with generic types disabled

2022-10-14 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-24549.
---
Resolution: Duplicate

Resolving as duplicate of FLINK-24943

> FlinkKinesisConsumer does not work with generic types disabled
> --
>
> Key: FLINK-24549
> URL: https://issues.apache.org/jira/browse/FLINK-24549
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.12.7, 1.13.6, 1.14.4, 1.15.0
>Reporter: Dawid Wysakowicz
>Assignee: Alexander Egorov
>Priority: Major
>
> FlinkKinesisConsumer uses {{GenericTypeInfo}} internally, which makes it 
> impossible to disable generic types in the entire job.
> {code}
> java.lang.UnsupportedOperationException: Generic types have been disabled in 
> the ExecutionConfig and type 
> org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is treated 
> as a generic type.
> at 
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
> at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
> at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
> at 
> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
> at 
> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
> at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Reported in the ML: 
> https://lists.apache.org/thread.html/r6e7723a9d1d77e223fbab481c9a53cbd4a2189ee7442302ee3c33b95%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-23167) Port Kinesis Table API e2e tests to release-1.12 branch

2022-10-14 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-23167.
---
Resolution: Won't Fix

> Port Kinesis Table API e2e tests to release-1.12 branch
> ---
>
> Key: FLINK-23167
> URL: https://issues.apache.org/jira/browse/FLINK-23167
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Affects Versions: 1.12.6
>Reporter: Emre Kartoglu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> kinesis
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> https://issues.apache.org/jira/browse/FLINK-20042 added e2e tests for the 
> Kinesis Table API. This was only done for versions >=1.13 however.
> We need to port these tests to the release-1.12 branch as version 1.12 
> supports the same functionality that needs the same (or a similar) test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23167) Port Kinesis Table API e2e tests to release-1.12 branch

2022-10-14 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617841#comment-17617841
 ] 

Danny Cranmer commented on FLINK-23167:
---

1.12 is now out of support, resolving as won't do

> Port Kinesis Table API e2e tests to release-1.12 branch
> ---
>
> Key: FLINK-23167
> URL: https://issues.apache.org/jira/browse/FLINK-23167
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Affects Versions: 1.12.6
>Reporter: Emre Kartoglu
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> kinesis
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> https://issues.apache.org/jira/browse/FLINK-20042 added e2e tests for the 
> Kinesis Table API. This was only done for versions >=1.13 however.
> We need to port these tests to the release-1.12 branch as version 1.12 
> supports the same functionality that needs the same (or a similar) test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-24549) FlinkKinesisConsumer does not work with generic types disabled

2022-10-14 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-24549:
--
Fix Version/s: (was: 1.17.0)

> FlinkKinesisConsumer does not work with generic types disabled
> --
>
> Key: FLINK-24549
> URL: https://issues.apache.org/jira/browse/FLINK-24549
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.12.7, 1.13.6, 1.14.4, 1.15.0
>Reporter: Dawid Wysakowicz
>Assignee: Alexander Egorov
>Priority: Major
>
> FlinkKinesisConsumer uses {{GenericTypeInfo}} internally, which makes it 
> impossible to disable generic types in the entire job.
> {code}
> java.lang.UnsupportedOperationException: Generic types have been disabled in 
> the ExecutionConfig and type 
> org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is treated 
> as a generic type.
> at 
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
> at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
> at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
> at 
> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
> at 
> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
> at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Reported in the ML: 
> https://lists.apache.org/thread.html/r6e7723a9d1d77e223fbab481c9a53cbd4a2189ee7442302ee3c33b95%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24913) Upgrade KPL version in flink-connector-kinesis to support ARM

2022-10-14 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617839#comment-17617839
 ] 

Danny Cranmer commented on FLINK-24913:
---

Given that we added a new Kinesis Sink in Flink 1.15 that does not use KPL, and 
the old sink is legacy. Resolving this as Wont do. The new KPL versions pull in 
an overly large dependency tree.

> Upgrade KPL version in flink-connector-kinesis to support ARM
> -
>
> Key: FLINK-24913
> URL: https://issues.apache.org/jira/browse/FLINK-24913
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Affects Versions: 1.11.4, 1.12.5, 1.13.3, 1.14.1
> Environment: Java 11 + EMR6.4 + Flink 1.13.1 + Kinesis.
> The use case is using EMR Flink Application with c6g.4xlarge(Graviton ARM64) 
> based instances to connect to Kinesis Data streams.
>Reporter: Melody
>Assignee: Melody
>Priority: Major
>  Labels: AWS, stale-assigned
>
> KPL supports Graviton (ARM64) based EC2 instance since the [0.14.4 
> release|[https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.4|https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.4).]].
>  However, the latest flink-connector-kinesis in Flink doesn't support 0.14.4 
> or higher version yet. Please upgrade the KPL to support ARM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-24913) Upgrade KPL version in flink-connector-kinesis to support ARM

2022-10-14 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-24913.
---
Resolution: Won't Do

> Upgrade KPL version in flink-connector-kinesis to support ARM
> -
>
> Key: FLINK-24913
> URL: https://issues.apache.org/jira/browse/FLINK-24913
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Affects Versions: 1.11.4, 1.12.5, 1.13.3, 1.14.1
> Environment: Java 11 + EMR6.4 + Flink 1.13.1 + Kinesis.
> The use case is using EMR Flink Application with c6g.4xlarge(Graviton ARM64) 
> based instances to connect to Kinesis Data streams.
>Reporter: Melody
>Assignee: Melody
>Priority: Major
>  Labels: AWS, stale-assigned
>
> KPL supports Graviton (ARM64) based EC2 instance since the [0.14.4 
> release|[https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.4|https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.4).]].
>  However, the latest flink-connector-kinesis in Flink doesn't support 0.14.4 
> or higher version yet. Please upgrade the KPL to support ARM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dannycranmer commented on pull request #20257: [FLINK-24943][Connectors / Kinesis] Explicitly create KryoSerializer for SequenceNumber class in Kinesis Consumer

2022-10-14 Thread GitBox


dannycranmer commented on PR #20257:
URL: https://github.com/apache/flink/pull/20257#issuecomment-1279205494

   @Cyberness please also squash commits and update commit message as per the 
[contribution 
guide](https://flink.apache.org/contributing/contribute-documentation.html#submit-your-contribution)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-25729) Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on the AWS SDK for Java 2.x

2022-10-14 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-25729.
---
Resolution: Duplicate

> Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on 
> the AWS SDK for Java 2.x
> -
>
> Key: FLINK-25729
> URL: https://issues.apache.org/jira/browse/FLINK-25729
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.0
>Reporter: Zichen Liu
>Priority: Major
>
> h2. Motivation
> *User stories:*
> As a Flink user, I’d like to use a Kinesis Consumer based on the AWS SDK for 
> Java 2.x rather than the Kinesis Consumer Library.
>  
>  * Maintain all the features of the current consumer at 
> `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` in the 
> new source
>  * The new sink should live in the module 
> `flink-connector-aws-kinesis-streams` (old: `flink-connector-kinesis`)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25729) Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on the AWS SDK for Java 2.x

2022-10-14 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617831#comment-17617831
 ] 

Danny Cranmer commented on FLINK-25729:
---

Yes I believe so, resolving this as duplicate

> Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on 
> the AWS SDK for Java 2.x
> -
>
> Key: FLINK-25729
> URL: https://issues.apache.org/jira/browse/FLINK-25729
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.0
>Reporter: Zichen Liu
>Priority: Major
>
> h2. Motivation
> *User stories:*
> As a Flink user, I’d like to use a Kinesis Consumer based on the AWS SDK for 
> Java 2.x rather than the Kinesis Consumer Library.
>  
>  * Maintain all the features of the current consumer at 
> `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` in the 
> new source
>  * The new sink should live in the module 
> `flink-connector-aws-kinesis-streams` (old: `flink-connector-kinesis`)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995921557


##
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderFactory.java:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import java.io.Serializable;
+
+/**
+ * A factory for creating source reader instances.
+ *
+ * @param  The type of the output elements.
+ */
+public interface SourceReaderFactory extends 
Serializable {

Review Comment:
   missing `@Public`; hopefully this fixes one of the japicmp violations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26657) Resilient Kinesis consumption

2022-10-14 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617828#comment-17617828
 ] 

Danny Cranmer commented on FLINK-26657:
---

Resolving as "won't do" due to inactivity 

> Resilient Kinesis consumption
> -
>
> Key: FLINK-26657
> URL: https://issues.apache.org/jira/browse/FLINK-26657
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Reporter: John Karp
>Assignee: John Karp
>Priority: Major
>
> Currently, any sort of error reading from a Kinesis stream will quickly 
> result in a job-killing error. If the error is not 'recoverable', failure 
> will be instant, or if it is 'recoverable', there will be a fixed number of 
> retries before the job fails -- and for some operations such as GetRecords, 
> the retries can be exhausted in just a few seconds. Furthermore, 
> KinesisProxy.isRecoverableSdkClientException() and 
> KinesisProxy.isRecoverableException() only recognize very narrow categories 
> of errors as even being recoverable.
> So for example if a Flink job is aggregating Kinesis streams from multiple 
> regions, the Flink job will not be able to make any forward progress on 
> processing data from any region if there is a single-region outage, since the 
> job will likely fail before any checkpoint can be completed. For some use 
> cases, it is better to proceed with processing most of the data, than to wait 
> indefinitely for the problematic region to recover.
> One mitigation is to increase all of the ConsumerConfig timeouts to be very 
> high. However, this will only affect error handling for 'recoverable' 
> exceptions, and depending on the nature of the regional failure, the 
> resulting errors may not be classified as 'recoverable'.
> Proposed mitigation: add a 'soft failure' mode to the Kinesis consumer, where 
> most errors arising from Kinesis operations are considered recoverable, and 
> there are unlimited retries. (Except for perhaps EFO de-registration, which 
> I'm assuming needs to complete in a timely fashion. Also, it looks like 
> ExpiredIteratorException needs to bubble up to 
> PollingRecordPublisher.getRecords() without retries.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-26657) Resilient Kinesis consumption

2022-10-14 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-26657.
---
Resolution: Won't Do

> Resilient Kinesis consumption
> -
>
> Key: FLINK-26657
> URL: https://issues.apache.org/jira/browse/FLINK-26657
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kinesis
>Reporter: John Karp
>Assignee: John Karp
>Priority: Major
>
> Currently, any sort of error reading from a Kinesis stream will quickly 
> result in a job-killing error. If the error is not 'recoverable', failure 
> will be instant, or if it is 'recoverable', there will be a fixed number of 
> retries before the job fails -- and for some operations such as GetRecords, 
> the retries can be exhausted in just a few seconds. Furthermore, 
> KinesisProxy.isRecoverableSdkClientException() and 
> KinesisProxy.isRecoverableException() only recognize very narrow categories 
> of errors as even being recoverable.
> So for example if a Flink job is aggregating Kinesis streams from multiple 
> regions, the Flink job will not be able to make any forward progress on 
> processing data from any region if there is a single-region outage, since the 
> job will likely fail before any checkpoint can be completed. For some use 
> cases, it is better to proceed with processing most of the data, than to wait 
> indefinitely for the problematic region to recover.
> One mitigation is to increase all of the ConsumerConfig timeouts to be very 
> high. However, this will only affect error handling for 'recoverable' 
> exceptions, and depending on the nature of the regional failure, the 
> resulting errors may not be classified as 'recoverable'.
> Proposed mitigation: add a 'soft failure' mode to the Kinesis consumer, where 
> most errors arising from Kinesis operations are considered recoverable, and 
> there are unlimited retries. (Except for perhaps EFO de-registration, which 
> I'm assuming needs to complete in a timely fashion. Also, it looks like 
> ExpiredIteratorException needs to bubble up to 
> PollingRecordPublisher.getRecords() without retries.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29572) Flink Task Manager skip loopback interface for resource manager registration

2022-10-14 Thread Kevin Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617811#comment-17617811
 ] 

Kevin Li commented on FLINK-29572:
--

No, it wouldn't. This problem happens for K8s deployment. For K8s, all task 
managers share the same configuration, which was converted from config-map. I 
think we just need a configuration flag to skip loopback check since we know 
Job Manager is not running on localhost.

As indicated from doc: 

{code:java}
The external address of the network interface where the TaskManager is exposed. 
Because different TaskManagers need different values for this option, usually 
it is specified in an additional non-shared TaskManager-specific config file.
{code}


> Flink Task Manager skip loopback interface for resource manager registration
> 
>
> Key: FLINK-29572
> URL: https://issues.apache.org/jira/browse/FLINK-29572
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.15.2
> Environment: Flink 1.15.2
> Kubernetes with Istio Proxy
>Reporter: Kevin Li
>Priority: Major
>
> Currently Flink Task Manager use different local interface to bind to connect 
> to Resource Manager. First one is Loopback interface. Normally if Job Manager 
> is running on remote host/container, using loopback interface to connect will 
> fail and it will pick up correct IP address.
> However, if Task Manager is running with some proxy, loopback interface can 
> connect to remote host as well. This will result 127.0.0.1 reported to 
> Resource Manager during registration, even Job Manager/Resource Manager runs 
> on remote host, and problem will happen. For us, only one Task Manager can 
> register in this case.
> I suggest adding configuration to skip Loopback interface check if we know 
> Job/Resource Manager is running on remote host/container.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21075: [FLINK-28085][Connector/Pulsar] Close the pending Pulsar transactions before closing pipeline

2022-10-14 Thread GitBox


flinkbot commented on PR #21075:
URL: https://github.com/apache/flink/pull/21075#issuecomment-1279137259

   
   ## CI report:
   
   * cdc2a58e9c51cd7c6e9293b4cd1418f9ccd5c374 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-26027) Add FLIP-33 metrics to new PulsarSink

2022-10-14 Thread Yufan Sheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yufan Sheng updated FLINK-26027:

Fix Version/s: 1.17.0

> Add FLIP-33 metrics to new PulsarSink
> -
>
> Key: FLINK-26027
> URL: https://issues.apache.org/jira/browse/FLINK-26027
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Pulsar
>Reporter: Yufan Sheng
>Assignee: Yufei Zhang
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29633) Operator doesn't pass initialSavepointPath as fromSavepoint argument

2022-10-14 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-29633:
---
Priority: Critical  (was: Major)

> Operator doesn't pass initialSavepointPath as fromSavepoint argument
> 
>
> Key: FLINK-29633
> URL: https://issues.apache.org/jira/browse/FLINK-29633
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.2.0
>Reporter: Yaroslav Tkachenko
>Assignee: Yaroslav Tkachenko
>Priority: Critical
>  Labels: pull-request-available
>
> The Kubernetes Operator doesn't pass *initialSavepointPath* from the JobSpec 
> as a *--fromSavepoint* argument to the JobManager. The operator does update 
> the configuration, but in the standalone mode, Flink actually [overrides 
> that|https://github.com/apache/flink/blob/012dc6a9b800bae0cfa5250d38de992ccbabc015/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java#L57-L63]
>  based on the command-line arguments. 
> *CmdStandaloneJobManagerDecorator* should be updated to include 
> *fromSavepoint.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] syhily opened a new pull request, #21075: [FLINK-28085][Connector/Pulsar] Close the pending Pulsar transactions before closing pipeline

2022-10-14 Thread GitBox


syhily opened a new pull request, #21075:
URL: https://github.com/apache/flink/pull/21075

   ## What is the purpose of the change
   
   PulsarUnorderedSourceReader should close all the pending transaction before 
shutdown.
   
   ## Brief change log
   
   1. Close the finished splits related split fetcher.
   2. Close all the transactions before shutdown.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*PulsarUnorderedSourceITCase*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp merged pull request #18760: [hotfix][docs] Fix minor grammar and spelling mistakes

2022-10-14 Thread GitBox


XComp merged PR #18760:
URL: https://github.com/apache/flink/pull/18760


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #18760: [hotfix][docs] Fix minor grammar and spelling mistakes

2022-10-14 Thread GitBox


XComp commented on PR #18760:
URL: https://github.com/apache/flink/pull/18760#issuecomment-1279106087

   Only documentation was affected. I'm gonna merge right away without waiting 
for CI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29646) SQL Gateway should return a simpler error message

2022-10-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-29646:
---
Description: 
sql gateway Should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
 {code}
 
The key information is:
{code:java}
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.

[~Wencong Liu] 

  was:
sql gateway Should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
 {code}
 
The key information is:
{code:java}
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.
 
 
 
 


> SQL Gateway should return a simpler error message
> -
>
> Key: FLINK-29646
> URL: https://issues.apache.org/jira/browse/FLINK-29646
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: yuanfenghu
>Priority: Minor
> Fix For: 1.16.0
>
>
> sql gateway Should return simpler exception information
> for example:
>   If i execute a sql statement through sql gateway but my statement has 
> syntax error  :[ inset into tablea select * from tableb  ]
> When I get exception information. The abnormal information returned by the 
> server is too redundant to quickly find the Key Information. 
> {code:java}
>  org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to 
> fetchResults.
>     at 
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
>     at 
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>     at java.util.Optional.ifPresent(Optional.java:159)
>     at 
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> 

[jira] [Updated] (FLINK-29646) SQL Gateway should return a simpler error message

2022-10-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-29646:
---
Description: 
sql gateway should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
 {code}
 
The key information is:
{code:java}
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.

[~Wencong Liu] 

  was:
sql gateway Should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
 {code}
 
The key information is:
{code:java}
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.

[~Wencong Liu] 


> SQL Gateway should return a simpler error message
> -
>
> Key: FLINK-29646
> URL: https://issues.apache.org/jira/browse/FLINK-29646
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: yuanfenghu
>Priority: Minor
> Fix For: 1.16.0
>
>
> sql gateway should return simpler exception information
> for example:
>   If i execute a sql statement through sql gateway but my statement has 
> syntax error  :[ inset into tablea select * from tableb  ]
> When I get exception information. The abnormal information returned by the 
> server is too redundant to quickly find the Key Information. 
> {code:java}
>  org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to 
> fetchResults.
>     at 
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
>     at 
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>     at java.util.Optional.ifPresent(Optional.java:159)
>     at 
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> 

[GitHub] [flink] RyanSkraba commented on pull request #18760: [hotfix][docs] Fix minor grammar and spelling mistakes

2022-10-14 Thread GitBox


RyanSkraba commented on PR #18760:
URL: https://github.com/apache/flink/pull/18760#issuecomment-1279095126

   Rebased, squashed into one commit and fixed merge conflicts.  Can you take a 
look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29646) SQL Gateway should return a simpler error message

2022-10-14 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-29646:
--

 Summary: SQL Gateway should return a simpler error message
 Key: FLINK-29646
 URL: https://issues.apache.org/jira/browse/FLINK-29646
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Gateway
Affects Versions: 1.16.0
Reporter: yuanfenghu
 Fix For: 1.16.0


sql gateway Should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
//代码占位符
 {code}
 
The key information is:
{code:java}
//代码占位符
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.
 
 
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29646) SQL Gateway should return a simpler error message

2022-10-14 Thread yuanfenghu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-29646:
---
Description: 
sql gateway Should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
 {code}
 
The key information is:
{code:java}
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.
 
 
 
 

  was:
sql gateway Should return simpler exception information
for example:
  If i execute a sql statement through sql gateway but my statement has syntax 
error  :[ inset into tablea select * from tableb  ]

When I get exception information. The abnormal information returned by the 
server is too redundant to quickly find the Key Information. 
{code:java}
//代码占位符
 {code}
 
The key information is:
{code:java}
//代码占位符
org.apache.flink.sql.parser.impl.ParseException: Encountered "inset" at line 4, 
column 2. Was expecting one of:     "INSERT" ...     "UPSERT" ... {code}
However, it is difficult for the client to see it quickly. I think sql gateway 
should have higher requirements for exception information.
 
 
 
 


> SQL Gateway should return a simpler error message
> -
>
> Key: FLINK-29646
> URL: https://issues.apache.org/jira/browse/FLINK-29646
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: yuanfenghu
>Priority: Minor
> Fix For: 1.16.0
>
>
> sql gateway Should return simpler exception information
> for example:
>   If i execute a sql statement through sql gateway but my statement has 
> syntax error  :[ inset into tablea select * from tableb  ]
> When I get exception information. The abnormal information returned by the 
> server is too redundant to quickly find the Key Information. 
> {code:java}
>  org.apache.flink.table.gateway.api.utils.SqlGatewayException: 
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to 
> fetchResults.
>     at 
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
>     at 
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
>     at 
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>     at java.util.Optional.ifPresent(Optional.java:159)
>     at 
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>     at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
>     at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at 
> 

[GitHub] [flink] flinkbot commented on pull request #21074: [FLINK-28820][Connector/Pulsar] Improve the writing performance for PulsarSink

2022-10-14 Thread GitBox


flinkbot commented on PR #21074:
URL: https://github.com/apache/flink/pull/21074#issuecomment-1279082363

   
   ## CI report:
   
   * 6004175f2aa10d2d805a3a575e5efa0954482d1d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26822) Add Source implementation for Cassandra connector

2022-10-14 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617758#comment-17617758
 ] 

Etienne Chauchot commented on FLINK-26822:
--

As promised, I have developed the Cassandra source connector based on FLIP-27. 
I've just submitted the PR

> Add Source implementation for Cassandra connector
> -
>
> Key: FLINK-26822
> URL: https://issues.apache.org/jira/browse/FLINK-26822
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Cassandra
>Reporter: Martijn Visser
>Assignee: Etienne Chauchot
>Priority: Major
>  Labels: pull-request-available
>
> The Cassandra connector is currently only available as a Sink implementation 
> for DataStream users. We should also make it available as a Source. This 
> should be done via Flink's Source API. More details can be found in FLIP-27 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28959) 504 gateway timeout when consume large number of topics using TopicPatten

2022-10-14 Thread Yufan Sheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yufan Sheng updated FLINK-28959:

Affects Version/s: (was: 1.14.5)

> 504 gateway timeout when consume large number of topics using TopicPatten
> -
>
> Key: FLINK-28959
> URL: https://issues.apache.org/jira/browse/FLINK-28959
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.1
> Environment: * flink-connector-pulsar: 1.15.0
> * Pulsar 2.9.2
>Reporter: Yufan Sheng
>Priority: Major
> Fix For: 1.17.0, 1.15.3, 1.14.7, 1.16.1
>
>
> Our situation is as follows:
> * In a single namespace, more than 300 topics(partitioned-topic with a single 
> partition) will report this error;
> * Error still exists after resource expansion
> * A flink client program consumes 30 to 50 topics per program. This error is 
> bound to be reported after five consecutive programs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28959) 504 gateway timeout when consume large number of topics using TopicPatten

2022-10-14 Thread Yufan Sheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yufan Sheng updated FLINK-28959:

Fix Version/s: (was: 1.14.7)

> 504 gateway timeout when consume large number of topics using TopicPatten
> -
>
> Key: FLINK-28959
> URL: https://issues.apache.org/jira/browse/FLINK-28959
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.1
> Environment: * flink-connector-pulsar: 1.15.0
> * Pulsar 2.9.2
>Reporter: Yufan Sheng
>Priority: Major
> Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> Our situation is as follows:
> * In a single namespace, more than 300 topics(partitioned-topic with a single 
> partition) will report this error;
> * Error still exists after resource expansion
> * A flink client program consumes 30 to 50 topics per program. This error is 
> bound to be reported after five consecutive programs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21073: [FLINK-26822] Add Source for Cassandra connector

2022-10-14 Thread GitBox


flinkbot commented on PR #21073:
URL: https://github.com/apache/flink/pull/21073#issuecomment-1279069831

   
   ## CI report:
   
   * 39b9a1b91a23296082e30e64ffeba6a838805952 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] syhily opened a new pull request, #21074: [FLINK-28820][Connector/Pulsar] Improve the writing performance for PulsarSink

2022-10-14 Thread GitBox


syhily opened a new pull request, #21074:
URL: https://github.com/apache/flink/pull/21074

   ## What is the purpose of the change
   
   PulsarSink is slow when we use the `At-Least-Once` or `Exactly-Once`. This 
is because PulsarSink uses `MallboxExecutor` to write messages. A lot of 
useless context switches and locks acquire happened when using this writing 
policy. We should drop `MallboxExecutor` and send messages directly for better 
performance.
   
   ## Brief change log
   
 - Deprecated useless 
`PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM` option since 
Pulsar client has its own send rate limit by memory.
 - Remove the use of `MallboxExecutor` in `PulsarWriter`.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as *PulsarWriterTest*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-26822) Add Source implementation for Cassandra connector

2022-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-26822:
---
Labels: pull-request-available  (was: )

> Add Source implementation for Cassandra connector
> -
>
> Key: FLINK-26822
> URL: https://issues.apache.org/jira/browse/FLINK-26822
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Cassandra
>Reporter: Martijn Visser
>Assignee: Etienne Chauchot
>Priority: Major
>  Labels: pull-request-available
>
> The Cassandra connector is currently only available as a Sink implementation 
> for DataStream users. We should also make it available as a Source. This 
> should be done via Flink's Source API. More details can be found in FLIP-27 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] echauchot opened a new pull request, #21073: [FLINK-26822] Add Source for Cassandra connector

2022-10-14 Thread GitBox


echauchot opened a new pull request, #21073:
URL: https://github.com/apache/flink/pull/21073

   ## What is the purpose of the change
   
   Add a flink source for Cassandra database compliant with 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
   
   ## Brief change log
   
   Implement everything needed for the source: enumerator, cassandra splits, 
source reader, split reader etc... + the related unit tests and integration 
tests.
   
   Moved the annotated Pojo class used in Cassandra batch example to a common 
place (tests / casssandra utils) to be able to use it with both example and 
source tests
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - CassandraSourceITCase: integration test using the new source test 
framework and a Cassandra test container
   - CassandraEnumeratorStateSerializerTest and CassandraSplitSerializerTest:  
for serialization tests
   - CassandraQueryTest: tests for query generation and query sanity checks
   - SplitsGeneratorTest: test for ring ranges creation (use for splits)
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): add connector base 
and connector test utils
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: adds a new `@Public(Evolving)` API
 - The serializers: add some new ones
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? javadoc
   
   R: @zentol 
   CC: @MartijnVisser 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21072: [FLINK-28960][Connector/Pulsar] Add jaxb-api back to pulsar-client-all dependencies.

2022-10-14 Thread GitBox


flinkbot commented on PR #21072:
URL: https://github.com/apache/flink/pull/21072#issuecomment-1279052707

   
   ## CI report:
   
   * 647e279d5e4295d56ecc58184870317a6a67b1db UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21071: [FLINK-29433][Connector/Pulsar] Support Auth through the builder pattern in Pulsar connector.

2022-10-14 Thread GitBox


flinkbot commented on PR #21071:
URL: https://github.com/apache/flink/pull/21071#issuecomment-1279044483

   
   ## CI report:
   
   * df6929923a7dbc1d868e7dc5e4533213050378f6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995787574


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   There does not appear to be any guarantee that `pollNext` will be called 
between checkpoints.
   
   With that in mind I'd be alright with the current approach. The new 
checkpointing REST API (FLINK-27101) could be a way for users to work around 
this problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-28960) Pulsar throws java.lang.NoClassDefFoundError: javax/xml/bind/annotation/XmlElement

2022-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-28960:
---
Labels: pull-request-available  (was: )

> Pulsar throws java.lang.NoClassDefFoundError: 
> javax/xml/bind/annotation/XmlElement
> --
>
> Key: FLINK-28960
> URL: https://issues.apache.org/jira/browse/FLINK-28960
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.1, 1.14.6
>Reporter: Yufan Sheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.3, 1.14.7, 1.16.1
>
>
> {code:java}
> Unknown HK2 failure detected:
> MultiException stack 1 of 2
> java.lang.NoClassDefFoundError: javax/xml/bind/annotation/XmlElement
>   at 
> org.apache.pulsar.shade.com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector.(JaxbAnnotationIntrospector.java:137)
>   at 
> org.apache.pulsar.shade.com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector.(JaxbAnnotationIntrospector.java:124)
>   at 
> org.apache.pulsar.shade.com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector.(JaxbAnnotationIntrospector.java:116)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>   at java.base/java.lang.Class.newInstance(Class.java:584)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] syhily opened a new pull request, #21072: [FLINK-28960][Connector/Pulsar] Add jaxb-api back to pulsar-client-all dependencies.

2022-10-14 Thread GitBox


syhily opened a new pull request, #21072:
URL: https://github.com/apache/flink/pull/21072

   ## What is the purpose of the change
   
   `pulsar-client-all` requires jaxb-api to serialize some Admin API. We 
shouldn't exclude it in current dependency list. So we just add it back to 
`flink-connector-pulsar`.
   
   ## Brief change log
   
   Same above.
   
   ## Verifying this change
   
   This change is a trivial rework without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #403: [FLINK-29633] Pass fromSavepoint argument

2022-10-14 Thread GitBox


gyfora commented on PR #403:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/403#issuecomment-1279035826

   We should think of a way to test these in a reliable way. Testing the 
parameters is not enough as might not be understood by the JM as you pointed 
out.
   
   Maybe we need some e2e tests for standalone mode that test these params 
properly 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29401) Improve observer structure

2022-10-14 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-29401.
--
Resolution: Fixed

merged to main bca630c3a003149683b74a54ccd376f9a9691028

> Improve observer structure
> --
>
> Key: FLINK-29401
> URL: https://issues.apache.org/jira/browse/FLINK-29401
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.3.0
>
>
> The AbstractDeploymentObserver and SessionJobObserver at this point share a 
> lot of common logic due to the unification of other parts.
> We should factor out the common parts into an abstract base observer class.
> Furthermore we should move the logic of the SavepointObserver into the 
> JobStatusObserver where it logically belongs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21070: [FLINK-29580][Connector/Pulsar] Remove pulsar.consumer.autoUpdatePartitionsIntervalSeconds option.

2022-10-14 Thread GitBox


flinkbot commented on PR #21070:
URL: https://github.com/apache/flink/pull/21070#issuecomment-1279033294

   
   ## CI report:
   
   * 757289c8fafe5f687f5772b7ce45d4f1cf7e5f55 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora merged pull request #400: [FLINK-29401] Refactor observer structure

2022-10-14 Thread GitBox


gyfora merged PR #400:
URL: https://github.com/apache/flink-kubernetes-operator/pull/400


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29433) Support Auth through the builder pattern in Pulsar connector

2022-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29433:
---
Labels: pull-request-available  (was: )

> Support Auth through the builder pattern in Pulsar connector
> 
>
> Key: FLINK-29433
> URL: https://issues.apache.org/jira/browse/FLINK-29433
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.0
>Reporter: Yufan Sheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Currently in order to use auth with the Flink Connector you needs to do so 
> through the {{setConfig}} method.
> It would be nice if similar to the client API we can add methods inside the 
> builder pattern.
> Example:
> {code:java}
> builder.authentication(new AuthenticationToken(""))
> {code}
> We can do something similar for the connector instead of having to do:
> {code:java}
> PulsarSource.builder()
> .setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME, 
> "org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2")
> .setConfig(PulsarOptions.PULSAR_AUTH_PARAMS, "{"privateKey":"..."})
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] syhily opened a new pull request, #21071: [FLINK-29433][Connector/Pulsar] Support Auth through the builder pattern in Pulsar connector.

2022-10-14 Thread GitBox


syhily opened a new pull request, #21071:
URL: https://github.com/apache/flink/pull/21071

   ## What is the purpose of the change
   
   Authentication is a common used function in consuming/producing message 
between Pulsar and Flink. We didn't expose it to Builder method, and use the 
common config options instead. Exposing the auth setting to a new 
`setAuthentication` method will simplify the development for end-user.
   
   ## Brief change log
   
   Add two new `setAuthentication` method for both `PulsarSinkBuilder` and 
`PulsarSourceBuilder`.
   
   ## Verifying this change
   
   This change is a trivial rework without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-29512) Align SubtaskCommittableManager checkpointId with CheckpointCommittableManagerImpl checkpointId during recovery

2022-10-14 Thread Fabian Paul (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Paul resolved FLINK-29512.
-
Fix Version/s: 1.17.0
   1.15.3
   1.16.1
   Resolution: Fixed

> Align SubtaskCommittableManager checkpointId with 
> CheckpointCommittableManagerImpl checkpointId during recovery
> ---
>
> Key: FLINK-29512
> URL: https://issues.apache.org/jira/browse/FLINK-29512
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.15.1, 1.16.0, 1.17.0
>Reporter: Fabian Paul
>Assignee: Fabian Paul
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> Similar to the issue described in 
> https://issues.apache.org/jira/browse/FLINK-29509 during the recovery of 
> committables, the subtaskCommittables checkpointId is set to always 1 
> [https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L193]
>  while the holding CheckpointCommittableManager is initialized with the 
> checkpointId that is written into state 
> [https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155
>  
> .|https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155.]
>  
> This leads to that during a recovery, the post-commit topology will receive a 
> committable summary with the recovered checkpoint id and multiple 
> `CommittableWithLinage`s with the reset checkpointId causing orphaned 
> `CommittableWithLinages` without a `CommittableSummary` failing the job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29512) Align SubtaskCommittableManager checkpointId with CheckpointCommittableManagerImpl checkpointId during recovery

2022-10-14 Thread Fabian Paul (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617734#comment-17617734
 ] 

Fabian Paul commented on FLINK-29512:
-

Merged into: 

 

master: d03c334b7ac954e70877a5bdb7b50cf54b50624c

release-1.16: 1bef1aeb6abd4edda0e178a6a5da7f6dc6c3b074

release-1.15: 126ae4df9ef8bab98a53433ef39c698cf8f04c60

> Align SubtaskCommittableManager checkpointId with 
> CheckpointCommittableManagerImpl checkpointId during recovery
> ---
>
> Key: FLINK-29512
> URL: https://issues.apache.org/jira/browse/FLINK-29512
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.15.1, 1.16.0, 1.17.0
>Reporter: Fabian Paul
>Assignee: Fabian Paul
>Priority: Critical
>  Labels: pull-request-available
>
> Similar to the issue described in 
> https://issues.apache.org/jira/browse/FLINK-29509 during the recovery of 
> committables, the subtaskCommittables checkpointId is set to always 1 
> [https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L193]
>  while the holding CheckpointCommittableManager is initialized with the 
> checkpointId that is written into state 
> [https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155
>  
> .|https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155.]
>  
> This leads to that during a recovery, the post-commit topology will receive a 
> committable summary with the recovered checkpoint id and multiple 
> `CommittableWithLinage`s with the reset checkpointId causing orphaned 
> `CommittableWithLinages` without a `CommittableSummary` failing the job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fapaul merged pull request #21063: [BP-1.15][FLINK-29512][streaming] Align the checkpointId of recovered CommittableWithLinages with the CommittableSummary

2022-10-14 Thread GitBox


fapaul merged PR #21063:
URL: https://github.com/apache/flink/pull/21063


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fapaul merged pull request #21062: [BP-1.16][FLINK-29512][streaming] Align the checkpointId of recovered CommittableWithLinages with the CommittableSummary

2022-10-14 Thread GitBox


fapaul merged PR #21062:
URL: https://github.com/apache/flink/pull/21062


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fapaul merged pull request #21022: [FLINK-29512][streaming] Align the checkpointId of recovered CommittableWithLinages with the CommittableSummary

2022-10-14 Thread GitBox


fapaul merged PR #21022:
URL: https://github.com/apache/flink/pull/21022


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29433) Support Auth through the builder pattern in Pulsar connector

2022-10-14 Thread Yufan Sheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yufan Sheng updated FLINK-29433:

Affects Version/s: 1.17.0
   (was: 1.16.0)

> Support Auth through the builder pattern in Pulsar connector
> 
>
> Key: FLINK-29433
> URL: https://issues.apache.org/jira/browse/FLINK-29433
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.0
>Reporter: Yufan Sheng
>Priority: Minor
> Fix For: 1.17.0
>
>
> Currently in order to use auth with the Flink Connector you needs to do so 
> through the {{setConfig}} method.
> It would be nice if similar to the client API we can add methods inside the 
> builder pattern.
> Example:
> {code:java}
> builder.authentication(new AuthenticationToken(""))
> {code}
> We can do something similar for the connector instead of having to do:
> {code:java}
> PulsarSource.builder()
> .setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME, 
> "org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2")
> .setConfig(PulsarOptions.PULSAR_AUTH_PARAMS, "{"privateKey":"..."})
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29580) pulsar.consumer.autoUpdatePartitionsIntervalSeconds doesn't works and should be removed

2022-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29580:
---
Labels: pull-request-available  (was: )

> pulsar.consumer.autoUpdatePartitionsIntervalSeconds doesn't works and should 
> be removed
> ---
>
> Key: FLINK-29580
> URL: https://issues.apache.org/jira/browse/FLINK-29580
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.0, 1.15.2, 1.16.1
>Reporter: Yufan Sheng
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] syhily opened a new pull request, #21070: [FLINK-29580][Connector/Pulsar] Remove pulsar.consumer.autoUpdatePartitionsIntervalSeconds option.

2022-10-14 Thread GitBox


syhily opened a new pull request, #21070:
URL: https://github.com/apache/flink/pull/21070

   ## What is the purpose of the change
   
   The config option `pulsar.consumer.autoUpdatePartitionsIntervalSeconds` in 
Pulsar connector is conflict with Flink's split discovery logic and useless. We 
should remove it.
   
   ## Brief change log
   
   Remove the 
`PulsarSourceOptions.PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS` field.
   
   ## Verifying this change
   
   This change is a trivial rework without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29536) Add WATCH_NAMESPACES env var to kubernetes operator

2022-10-14 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-29536:
---
Fix Version/s: (was: kubernetes-operator-1.2.0)

> Add WATCH_NAMESPACES env var to kubernetes operator
> ---
>
> Key: FLINK-29536
> URL: https://issues.apache.org/jira/browse/FLINK-29536
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.2.0
>Reporter: Tony Garrard
>Priority: Major
>
> Provide the ability to set the namespaces watched by the operator using an 
> env var. Whilst the additional config can still be used, the presence of the 
> env var will take priority.
>  
> Reasons for issue
>  # Operator will take effect of the setting immediately as pod will roll 
> (rather than waiting for the config to be refreshed)
>  # If the operator is to be olm bundled we will be able to set the target 
> namespace using the following 
> {{env:}}
>   {{  - name: WATCHED_NAMESPACE}}
>   {{valueFrom:}}
>   {{  fieldRef:}}
>  {{fieldPath: 
> metadata.annotations['olm.targetNamespaces']}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29550) example "basic-checkpoint-ha.yaml" not working

2022-10-14 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-29550.
--
Resolution: Cannot Reproduce

The examples are mainly designed for minikube, other kubernetes envs may 
require additional settings.

> example "basic-checkpoint-ha.yaml" not working
> --
>
> Key: FLINK-29550
> URL: https://issues.apache.org/jira/browse/FLINK-29550
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
> Environment: * Kubernetes: EKS 1.22
>  * Node: bottlerocket linux
>  * Manifest: 
> https://github.com/apache/flink-kubernetes-operator/blob/release-1.1/examples/basic-checkpoint-ha.yaml
>Reporter: roa
>Priority: Minor
>
> Hi,
> I'm a flink beginner. and I'm considering using the kubernetes operator.
> Before using it, we are testing these features and examples.
> But, when I tried to apply basic-checkpoint-ha.yaml, I faced the below error.
> {code:java}
> 2022-10-08 17:04:08,261 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
> occurred in the cluster entrypoint.
> java.util.concurrent.CompletionException: java.lang.IllegalStateException: 
> The base directory of the JobResultStore isn't accessible. No dirty 
> JobResults can be restored.
>     at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> Source) [?:?]
>     at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
> [?:?]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
> [?:?]
>     at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.lang.IllegalStateException: The base directory of the 
> JobResultStore isn't accessible. No dirty JobResults can be restored.
>     at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
> ~[flink-dist-1.15.2.jar:1.15.2]
>     at 
> org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:181)
>  ~[flink-dist-1.15.2.jar:1.15.2]
>     at 
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
>  ~[flink-dist-1.15.2.jar:1.15.2]
>     at 
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
>  ~[flink-dist-1.15.2.jar:1.15.2]
>     at 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
>  ~[flink-dist-1.15.2.jar:1.15.2]
>     at 
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
>  ~[flink-dist-1.15.2.jar:1.15.2]
>     at 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
>  ~[flink-dist-1.15.2.jar:1.15.2]
>     ... 4 more
> 2022-10-08 17:04:08,268 INFO  org.apache.flink.runtime.blob.BlobServer        
>              [] - Stopped BLOB server at 0.0.0.0:6124
> 2022-10-08 17:04:08,270 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
> KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. 
> Diagnostics Cluster entrypoint has been closed externally.. {code}
> Could you let me know why that error occurs?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29566) Reschedule the cleanup logic if cancel job failed

2022-10-14 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617730#comment-17617730
 ] 

Gyula Fora commented on FLINK-29566:


I think this improvement makes sense :) 

> Reschedule the cleanup logic if cancel job failed
> -
>
> Key: FLINK-29566
> URL: https://issues.apache.org/jira/browse/FLINK-29566
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Xin Hao
>Priority: Minor
>
> Currently, when we remove the FlinkSessionJob object,
> we always remove the object even if the Flink job is not being canceled 
> successfully.
>  
> This is *not semantic consistent* if the FlinkSessionJob has been removed but 
> the Flink job is still running.
>  
> One of the scenarios is that if we deploy a FlinkDeployment with HA mode.
> When we remove the FlinkSessionJob and change the FlinkDeployment at the same 
> time,
> or if the TMs are restarting because of some bugs such as OOM.
> Both of these will cause the cancelation of the Flink job to fail because the 
> TMs are not available.
>  
> We should *reschedule* the cleanup logic if the FlinkDeployment is present.
> And we can add a new ReconciliationState DELETING to indicate the 
> FlinkSessionJob's status.
>  
> The logic will be
> {code:java}
> if the FlinkDeployment is not present
> delete the FlinkSessionJob object
> else
> if the JM is not available
>         reschedule
> else
> if cancel job successfully
>             delete the FlinkSessionJob object
> else
>             reschedule{code}
> When we cancel the Flink job, we need to verify all the jobs with the same 
> name have been deleted in case of the job id is changed after JM restarted.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29550) example "basic-checkpoint-ha.yaml" not working

2022-10-14 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-29550:
---
Affects Version/s: (was: 1.15.0)

> example "basic-checkpoint-ha.yaml" not working
> --
>
> Key: FLINK-29550
> URL: https://issues.apache.org/jira/browse/FLINK-29550
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
> Environment: * Kubernetes: EKS 1.22
>  * Node: bottlerocket linux
>  * Manifest: 
> https://github.com/apache/flink-kubernetes-operator/blob/release-1.1/examples/basic-checkpoint-ha.yaml
>Reporter: roa
>Priority: Minor
>
> Hi,
> I'm a flink beginner. and I'm considering using the kubernetes operator.
> Before using it, we are testing these features and examples.
> But, when I tried to apply basic-checkpoint-ha.yaml, I faced the below error.
> {code:java}
> 2022-10-08 17:04:08,261 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
> occurred in the cluster entrypoint.
> java.util.concurrent.CompletionException: java.lang.IllegalStateException: 
> The base directory of the JobResultStore isn't accessible. No dirty 
> JobResults can be restored.
>     at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> Source) [?:?]
>     at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
> [?:?]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
> [?:?]
>     at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.lang.IllegalStateException: The base directory of the 
> JobResultStore isn't accessible. No dirty JobResults can be restored.
>     at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
> ~[flink-dist-1.15.2.jar:1.15.2]
>     at 
> org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:181)
>  ~[flink-dist-1.15.2.jar:1.15.2]
>     at 
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
>  ~[flink-dist-1.15.2.jar:1.15.2]
>     at 
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
>  ~[flink-dist-1.15.2.jar:1.15.2]
>     at 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190)
>  ~[flink-dist-1.15.2.jar:1.15.2]
>     at 
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
>  ~[flink-dist-1.15.2.jar:1.15.2]
>     at 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184)
>  ~[flink-dist-1.15.2.jar:1.15.2]
>     ... 4 more
> 2022-10-08 17:04:08,268 INFO  org.apache.flink.runtime.blob.BlobServer        
>              [] - Stopped BLOB server at 0.0.0.0:6124
> 2022-10-08 17:04:08,270 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
> KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. 
> Diagnostics Cluster entrypoint has been closed externally.. {code}
> Could you let me know why that error occurs?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29645) BatchExecutionKeyedStateBackend is using incorrect ExecutionConfig when creating serializer

2022-10-14 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-29645:
--

 Summary: BatchExecutionKeyedStateBackend is using incorrect 
ExecutionConfig when creating serializer
 Key: FLINK-29645
 URL: https://issues.apache.org/jira/browse/FLINK-29645
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.14.6, 1.15.2, 1.13.6, 1.12.7, 1.16.0, 1.17.0
Reporter: Piotr Nowojski


{{org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend#getOrCreateKeyedState}}
 is using freshly constructed {{ExecutionConfig}}, instead of the one 
configured by the user from the environment.


{code:java}
public  S getOrCreateKeyedState(
TypeSerializer namespaceSerializer, StateDescriptor 
stateDescriptor)
throws Exception {
checkNotNull(namespaceSerializer, "Namespace serializer");
checkNotNull(
keySerializer,
"State key serializer has not been configured in the config. "
+ "This operation cannot use partitioned state.");

if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
}
{code}

The correct one could be obtained from {{env.getExecutionConfig()}} in 
{{org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend#createKeyedStateBackend}}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29634) Support periodic checkpoint triggering

2022-10-14 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617729#comment-17617729
 ] 

Gyula Fora commented on FLINK-29634:


I think savepoint history is mostly important so that the operator can do the 
cleanup, the operator should not track checkpoints that are otherwised owned 
and cleaned up by Flink itself.

The implementation could be similar but simpler than the savepointing because 
it should be enough to trigger the checkpoint, we don't need to keep track of 
the completion.

> Support periodic checkpoint triggering
> --
>
> Key: FLINK-29634
> URL: https://issues.apache.org/jira/browse/FLINK-29634
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Thomas Weise
>Assignee: Jiale Tan
>Priority: Major
>
> Similar to the support for periodic savepoints, the operator should support 
> triggering periodic checkpoints to break the incremental checkpoint chain.
> Support for external triggering will come with 1.17: 
> https://issues.apache.org/jira/browse/FLINK-27101 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21069: [FLINK-29613][Connector/Pulsar] Fix wrong batch size assertion.

2022-10-14 Thread GitBox


flinkbot commented on PR #21069:
URL: https://github.com/apache/flink/pull/21069#issuecomment-1278989774

   
   ## CI report:
   
   * 7d89bb64116b99d895edf1df5cba443715b1f710 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29644) Reference Kubernetes operator from Flink Kubernetes deploy docs

2022-10-14 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-29644:
--

 Summary: Reference Kubernetes operator from Flink Kubernetes 
deploy docs
 Key: FLINK-29644
 URL: https://issues.apache.org/jira/browse/FLINK-29644
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes, Documentation
Affects Versions: 1.15.2, 1.16.0, 1.17.0
Reporter: Gyula Fora


Currently the Flink deployment/resource provider docs provide some information 
for the Standalone and Native Kubernetes integration without any reference to 
the operator. 

We should provide a bit more visibility and value to the users by directly 
proposing to use the operator when considering Flink on Kubernetes. 

We should make the point that for most users the easiest way to use Flink on 
Kubernetes is probably through the operator (where they can now benefit from 
both standalone and native integration under the hood). This should help us 
avoid cases where a new user completely misses the existence of the operator 
when starting out based on the Flink docs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29613) Wrong message size assertion in Pulsar's batch message

2022-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29613:
---
Labels: pull-request-available  (was: )

> Wrong message size assertion in Pulsar's batch message
> --
>
> Key: FLINK-29613
> URL: https://issues.apache.org/jira/browse/FLINK-29613
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.16.0, 1.17.0, 1.15.2
>Reporter: qiaomengnan
>Priority: Major
>  Labels: pull-request-available
>
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at 
> nextMessageIdorg.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
> at 
> org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:109)
> at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received 
> unexpected exception while polling the records
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> Suppressed: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
> ... 7 more
> Caused by: java.lang.IllegalArgumentException: We only support normal message 
> id currently.
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> at 
> org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId(MessageIdUtils.java:61)
> at 
> org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId(MessageIdUtils.java:43)
> at 
> org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.beforeCreatingConsumer(PulsarOrderedPartitionSplitReader.java:94)
> at 
> org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.handleSplitsChanges(PulsarPartitionSplitReaderBase.java:160)
> at 
> org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.handleSplitsChanges(PulsarOrderedPartitionSplitReader.java:52)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> ... 6 more
> Caused by: java.lang.IllegalArgumentException: We only support normal message 
> id currently.
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> at 
> org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId(MessageIdUtils.java:61)
> at 
> org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId(MessageIdUtils.java:43)
> at 
> org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader.beforeCreatingConsumer(PulsarOrderedPartitionSplitReader.java:94)
> at 
> 

[GitHub] [flink] syhily opened a new pull request, #21069: [FLINK-29613][Connector/Pulsar] Fix wrong batch size assertion.

2022-10-14 Thread GitBox


syhily opened a new pull request, #21069:
URL: https://github.com/apache/flink/pull/21069

   ## What is the purpose of the change
   
   Pulsar did a wrong batch message size assertion in code. We require every 
batch message should have size 1. But sometimes the message size could be 0 due 
to the serialization and deserialization. So we should change the assertion 
logic.
   
   ## Brief change log
   
   Change the implementation in `MessageIdUtils.unwrapMessageId(MessageId)`.
   
   ## Verifying this change
   
   This change is a trivial fix without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29632) Support nodeSelector in helm template for flink operator deployment

2022-10-14 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-29632.
--
Fix Version/s: kubernetes-operator-1.3.0
   Resolution: Fixed

merged to main 8e6ec37619992370835d5d632fa1e384c11c6143

> Support nodeSelector in helm template for flink operator deployment
> ---
>
> Key: FLINK-29632
> URL: https://issues.apache.org/jira/browse/FLINK-29632
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Jeesmon Jacob
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.3.0
>
>
> Currently helm chart of flink-kubernetes-operator doesn't allow adding 
> nodeSelector to operator deployment. There are cases where we want to 
> schedule operator to specific nodes and it will be great if helm chart can 
> support it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #402: [FLINK-29632] Support nodeSelector in helm template for flink operator deployment

2022-10-14 Thread GitBox


gyfora merged PR #402:
URL: https://github.com/apache/flink-kubernetes-operator/pull/402


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995726266


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   > I assume that snapshotState(), notifyCheckpointComplete() and pollNext() 
are all called in the same thread?
   
   Yes, that should be the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29643) Possible NPE in ApplicationDispatcherBootstrap with failedJob submission and no HA

2022-10-14 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-29643:
---

 Summary: Possible NPE in ApplicationDispatcherBootstrap with 
failedJob submission and no HA
 Key: FLINK-29643
 URL: https://issues.apache.org/jira/browse/FLINK-29643
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.15.2, 1.16.0, 1.17.0
Reporter: Nico Kruber


If
- {{PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID}} is not set, and
- high availabibility is not activated, and
- {{DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR}} is set,
then a failure in job submission may fail with an NPE since the appropriate 
code in {{ApplicationDispatcherBootstrap#runApplicationEntryPoint()}} is trying 
to read the {{failedJobId}} from the configuration where it will not be present 
in these cases.

Please refer to the conditions that set the {{jobId}} in 
{{ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync()}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] fapaul commented on pull request #21022: [FLINK-29512][streaming] Align the checkpointId of recovered CommittableWithLinages with the CommittableSummary

2022-10-14 Thread GitBox


fapaul commented on PR #21022:
URL: https://github.com/apache/flink/pull/21022#issuecomment-1278933763

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29634) Support periodic checkpoint triggering

2022-10-14 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617676#comment-17617676
 ] 

Thomas Weise commented on FLINK-29634:
--

[~Jiale] that PR it the right place to look at wrt how periodic savepoint 
triggering was added. Please note that a good portion of it is related to 
savepoint history also though. The operator does not maintain checkpoint 
history as checkpoints (so far) are triggered by Flink internally. Although it 
may be ultimately good to also keep track of checkpoints that were triggered by 
the operator within the CR status, perhaps it is best if we start with just the 
periodic triggering support? WDYT [~gyfora] ?

Please note that in order to work on this, the operator first needs to 
recognize Flink version 1.17 (currently it supports up to 1.16, see 
FlinkVersion). Then this feature needs to be built so that it is only effective 
for 1.17+ and operator keeps working with the other versions.

 

> Support periodic checkpoint triggering
> --
>
> Key: FLINK-29634
> URL: https://issues.apache.org/jira/browse/FLINK-29634
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Thomas Weise
>Assignee: Jiale Tan
>Priority: Major
>
> Similar to the support for periodic savepoints, the operator should support 
> triggering periodic checkpoints to break the incremental checkpoint chain.
> Support for external triggering will come with 1.17: 
> https://issues.apache.org/jira/browse/FLINK-27101 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-13876) Remove ExecutionConfig field from PojoSerializer

2022-10-14 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617675#comment-17617675
 ] 

Piotr Nowojski commented on FLINK-13876:


I'm not sure, but if this is a really issue, that's not solved by 
{{PojoSerializerSnapshot}} or {{PojoSerializerConfigSnapshot}}, then 
FLINK-19084 would be a braking change already. 

> Remove ExecutionConfig field from PojoSerializer
> 
>
> Key: FLINK-13876
> URL: https://issues.apache.org/jira/browse/FLINK-13876
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Affects Versions: 1.7.2, 1.8.1, 1.9.0, 1.10.0
>Reporter: Dawid Wysakowicz
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.17.0
>
>
> The PojoSerializers stores an instance of ExecutionConfig as internal field, 
> even though that the only information it may ever need are the registered 
> kryo serializers.
> This has a few drawbacks:
> * It blocks the evolution of {{ExecutionConfig}} as serializers where stored 
> in a state. Therefore any change to ExecutionConfig must be backwards 
> compatible in respect to java serialization
> * It probably already introduced a bug, as upon restore the Snapshot actually 
> recreates the serializer with an empty ExecutionConfig (see 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot#restoreSerializer)
> I suggest to remove the field completely and adjust corresponding usages.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol merged pull request #21057: [hotfix] [docs] correct grammar

2022-10-14 Thread GitBox


zentol merged PR #21057:
URL: https://github.com/apache/flink/pull/21057


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol closed pull request #20997: fix(sec): upgrade org.apache.hive:hive-exec to 3.1.3

2022-10-14 Thread GitBox


zentol closed pull request #20997: fix(sec): upgrade org.apache.hive:hive-exec 
to 3.1.3
URL: https://github.com/apache/flink/pull/20997


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #21064: [FLINK-29638][connectors][filesystems][formats] Update Jackson-BOM to 2.13.4.2 because of CVE-2022-42003

2022-10-14 Thread GitBox


snuyanzin commented on PR #21064:
URL: https://github.com/apache/flink/pull/21064#issuecomment-1278910720

   @flinkbot run azure
   
   failure seems related to https://issues.apache.org/jira/browse/FLINK-29387


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #21056: [FLINK-29379][datastream] Example of using Configuration to store ExecutionConfig fields

2022-10-14 Thread GitBox


zentol commented on code in PR #21056:
URL: https://github.com/apache/flink/pull/21056#discussion_r995678770


##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -148,6 +148,19 @@ public class ExecutionConfig implements Serializable, 
Archiveable 
GLOBAL_JOB_PARAMETERS_EXECUTION_CONFIG =
+key(PipelineOptions.GLOBAL_JOB_PARAMETERS.key())
+.defaultValue(new GlobalJobParameters());

Review Comment:
   I guess this is also required for the Kryo serializers and the 
`restartStrategyConfiguration`.
   
   I nevertheless like the change in general though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


afedulov commented on PR #20757:
URL: https://github.com/apache/flink/pull/20757#issuecomment-1278903252

   @zentol I modified the docs according to your proposals, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #21056: [FLINK-29379][datastream] Example of using Configuration to store ExecutionConfig fields

2022-10-14 Thread GitBox


zentol commented on code in PR #21056:
URL: https://github.com/apache/flink/pull/21056#discussion_r995676246


##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -756,7 +811,27 @@ public GlobalJobParameters getGlobalJobParameters() {
  */
 public void setGlobalJobParameters(GlobalJobParameters 
globalJobParameters) {
 Preconditions.checkNotNull(globalJobParameters, "globalJobParameters 
shouldn't be null");
-this.globalJobParameters = globalJobParameters;
+configuration.set(
+GLOBAL_JOB_PARAMETERS_EXECUTION_CONFIG, 
convertToString(globalJobParameters));
+}
+
+private static String convertToString(final Serializable object) {
+try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+oos.writeObject(object);
+return Base64.getEncoder().encodeToString(baos.toByteArray());
+} catch (final IOException e) {
+throw new IllegalStateException(e);
+}
+}
+
+private static  T convertFrom(final String 
objectAsString) {
+final byte[] data = Base64.getDecoder().decode(objectAsString);
+try (final ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(data))) {
+return (T) ois.readObject();

Review Comment:
   This won't work if the GlobalJobParamaters are a user-defined class. You'll 
need to somehow provide the EC with the usercode CL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


afedulov commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995674703


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of

Review Comment:
   Added.



##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


afedulov commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995674573


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 
+corresponding 
+{{< javadoc name="RateLimiterStrategy" 
file="org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.html">}}.
 
+This is particularly useful for testing scenarios where certain output
+is expected to be produced upon checkpoint completions. The below snippet 
illustrates an example of 
+producing the sequence of elements `"a","b", .. ,"j"` repeatedly between 
checkpoints:
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(3000);
+env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

Review Comment:
   Added a separate Boudedness section.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


afedulov commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995674275


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 
+corresponding 
+{{< javadoc name="RateLimiterStrategy" 
file="org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.html">}}.
 
+This is particularly useful for testing scenarios where certain output
+is expected to be produced upon checkpoint completions. The below snippet 
illustrates an example of 
+producing the sequence of elements `"a","b", .. ,"j"` repeatedly between 
checkpoints:
+
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(3000);
+env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+env.setParallelism(1);

Review Comment:
   The example got removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


afedulov commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995671986


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   What is the threading model (unfortunately a proper FLIP for the Mailbox 
implementation is missing and FLIP-27 does not go into much detail either)? I 
assume that `snapshotState()`, `notifyCheckpointComplete()` and `pollNext()` 
are all called in the same thread?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29619) Remove redundant MeterView updater thread from KubernetesClientMetrics

2022-10-14 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-29619.
--
Resolution: Fixed

merged to main f1387f723da3f274bcb3268d401d91042064255d

> Remove redundant MeterView updater thread from KubernetesClientMetrics
> --
>
> Key: FLINK-29619
> URL: https://issues.apache.org/jira/browse/FLINK-29619
> Project: Flink
>  Issue Type: Bug
>Affects Versions: kubernetes-operator-1.2.0
>Reporter: Matyas Orhidi
>Assignee: Matyas Orhidi
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.3.0
>
>
> The `MetricRegistryImpl` already has a solution to update `MeterView` objects 
> periodically.
> https://github.com/apache/flink/blob/7a509c46e45b9a91f2b7d01f13afcdef266b1faf/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java#L404



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995657759


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   > Do we actually have such examples?
   
   I haven't seen one; but they may exist :sweat_smile: 
   
   > Is this something that could be compared with the example that Steven 
shared 
[apache/iceberg/flink/source/BoundedTestSource.java#L70](https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java#L70)
 where data is emitted while holding the checkpointLock ?
   
   Exactly. There they control the _exact_ contents of each checkpoint.
   
   > It seems, though, that it should not prevent the new Source from being 
used as a replacement for the BoundedTestSource mentioned above with such 
reasonable settings, what do you think?
   
   Yes-ish; provided that the checkpoint interval is large enough there is a 
reasonably high change that it won't be an issue.
   
   Can the new sources even block checkpointing? I guess the only way to do 
that is by emitting multiple values in `pollNext`. But I'm not sure if there 
even is a guarantee that pollNext is called at least once between checkpoints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora merged pull request #401: [FLINK-29619] Remove redundant MeterView updater thread from KubernetesClientMetrics

2022-10-14 Thread GitBox


gyfora merged PR #401:
URL: https://github.com/apache/flink-kubernetes-operator/pull/401


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29641) SortMergeResultPartitionReadSchedulerTest.testCreateSubpartitionReader

2022-10-14 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17617651#comment-17617651
 ] 

Weijie Guo commented on FLINK-29641:


[~mapohl] Thank you for your report and investigation,I will take a look

> SortMergeResultPartitionReadSchedulerTest.testCreateSubpartitionReader
> --
>
> Key: FLINK-29641
> URL: https://issues.apache.org/jira/browse/FLINK-29641
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> [This 
> build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42011=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=8433]
>  failed (not exclusively) due to 
> {{SortMergeResultPartitionReadSchedulerTest.testCreateSubpartitionReader}}. 
> The assert checking that the {{SortedMergeSubpartitionReader}} is in running 
> state fails.
> My suspicion is that the condition in 
> [SortMergeResultPartitionReadScheduler.mayTriggerReading|https://github.com/apache/flink/blob/87d4f70e49255b551d0106117978b1aa0747358c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java#L425-L428]
>  (or something related to that condition) needs to be reconsidered since 
> that's the only time {{isRunning}} is actually set to true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] klion26 commented on a diff in pull request #21050: [FLINK-29095][state] Improve logging in SharedStateRegistry

2022-10-14 Thread GitBox


klion26 commented on code in PR #21050:
URL: https://github.com/apache/flink/pull/21050#discussion_r995430467


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##
@@ -95,60 +95,78 @@ public StreamStateHandle registerReference(
 entry = registeredStates.get(registrationKey);
 
 if (entry == null) {
-// Additional check that should never fail, because only state 
handles that are not
-// placeholders should
-// ever be inserted to the registry.
 checkState(
-!isPlaceholder(state),
+!isPlaceholder(newHandle),
 "Attempt to reference unknown state: " + 
registrationKey);
 
-entry = new SharedStateEntry(state, checkpointID);
+LOG.trace(
+"Registered new shared state {} under key {}.", 
newHandle, registrationKey);
+entry = new SharedStateEntry(newHandle, checkpointID);
 registeredStates.put(registrationKey, entry);
-LOG.trace("Registered new shared state {} under key {}.", 
entry, registrationKey);
 
-} else {
-// Delete if this is a real duplicate.
-// Note that task (backend) is not required to re-upload state
-// if the confirmation notification was missing.
-// However, it's also not required to use exactly the same 
handle or placeholder
-if (!Objects.equals(state, entry.stateHandle)) {
-if (entry.confirmed || isPlaceholder(state)) {
-scheduledStateDeletion = state;
-} else {
-// Old entry is not in a confirmed checkpoint yet, and 
the new one differs.
-// This might result from (omitted KG range here for 
simplicity):
-// 1. Flink recovers from a failure using a checkpoint 
1
-// 2. State Backend is initialized to UID xyz and a 
set of SST: { 01.sst }
-// 3. JM triggers checkpoint 2
-// 4. TM sends handle: "xyz-002.sst"; JM registers it 
under "xyz-002.sst"
-// 5. TM crashes; everything is repeated from (2)
-// 6. TM recovers from CP 1 again: backend UID "xyz", 
SST { 01.sst }
-// 7. JM triggers checkpoint 3
-// 8. TM sends NEW state "xyz-002.sst"
-// 9. JM discards it as duplicate
-// 10. checkpoint completes, but a wrong SST file is 
used
-// So we use a new entry and discard the old one:
-scheduledStateDeletion = entry.stateHandle;
-entry.stateHandle = state;
-}
-LOG.trace(
-"Identified duplicate state registration under key 
{}. New state {} was determined to "
-+ "be an unnecessary copy of existing 
state {} and will be dropped.",
-registrationKey,
-state,
-entry.stateHandle);
-}
+// no further handling
+return entry.stateHandle;
+
+} else if (entry.stateHandle == newHandle) {
+// might be a bug but state backend is not required to use a 
place-holder
+LOG.debug(
+"Duplicated registration under key {} with the same 
object: {}",
+registrationKey,
+newHandle);
+} else if (Objects.equals(entry.stateHandle, newHandle)) {

Review Comment:
   seems this block can cover the logic L110 - L115



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##
@@ -95,60 +95,78 @@ public StreamStateHandle registerReference(
 entry = registeredStates.get(registrationKey);
 
 if (entry == null) {
-// Additional check that should never fail, because only state 
handles that are not
-// placeholders should
-// ever be inserted to the registry.
 checkState(
-!isPlaceholder(state),
+!isPlaceholder(newHandle),
 "Attempt to reference unknown state: " + 
registrationKey);
 
-entry = new SharedStateEntry(state, checkpointID);
+LOG.trace(
+"Registered new shared state {} under key {}.", 
newHandle, registrationKey);
+entry = new SharedStateEntry(newHandle, checkpointID);
 registeredStates.put(registrationKey, entry);
-

[GitHub] [flink] afedulov commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


afedulov commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995629891


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   >hmm this also implies that not all GeneratorFunctions from the old source 
can be migrated; anything that changes data on checkpoint can't be replicated.
   
   Are you referring to 
`org.apache.flink.streaming.api.functions.source.datagen.DataGenerator`? Do we 
actually have such examples?
   
   >Actually, maybe remove the deterministic checkpointing bit. Technically it 
isn't guaranteed to be deterministic; ...
   
   Is this something that could be compared with the example that Steven shared 
[apache/iceberg/flink/source/BoundedTestSource.java#L70](https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java#L70)
 where data is emitted while holding the `checkpointLock` ?
   
   I guess from a practical perspective it boils down to having a reasonable 
relation between the number of emitted records vs the checkpoint duration 
(`pollNext()` vs `notifyCheckpointComplete()`), but you are right, there is no 
guarantee per se :confused: It seems, though, that it should not prevent the 
new `Source` from being used as a replacement for the `BoundedTestSource` 
mentioned above with such reasonable settings, what do you think?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >