[GitHub] [flink] flinkbot edited a comment on pull request #18966: [FLINK-26422][docs-zh][table]update chinese doc with the new TablePipeline docs
flinkbot edited a comment on pull request #18966: URL: https://github.com/apache/flink/pull/18966#issuecomment-1057770962 ## CI report: * 3968c560b0057d50f6f5ab312320f3453915d015 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32471) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #18966: [FLINK-26422][docs-zh][table]update chinese doc with the new TablePipeline docs
flinkbot commented on pull request #18966: URL: https://github.com/apache/flink/pull/18966#issuecomment-1057770962 ## CI report: * 3968c560b0057d50f6f5ab312320f3453915d015 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18838: [FLINK-26177][Connector/pulsar] Use testcontainer pulsar runtime instead o…
flinkbot edited a comment on pull request #18838: URL: https://github.com/apache/flink/pull/18838#issuecomment-1044141081 ## CI report: * b737720f52bd34e5e9bde6e0e87f45098f8ad27d Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32467) * 114fce453eb7b291e0fdde65ff8e9cbd2cc8aa9a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32470) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25771) CassandraConnectorITCase.testRetrialAndDropTables timeouts on AZP
[ https://issues.apache.org/jira/browse/FLINK-25771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500562#comment-17500562 ] Yun Gao commented on FLINK-25771: - Same for {{{}CassandraConnectorITCase.testScalingUp{}}}: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32459=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d=12569 > CassandraConnectorITCase.testRetrialAndDropTables timeouts on AZP > - > > Key: FLINK-25771 > URL: https://issues.apache.org/jira/browse/FLINK-25771 > Project: Flink > Issue Type: Bug > Components: Connectors / Cassandra >Affects Versions: 1.15.0, 1.13.5, 1.14.3 >Reporter: Till Rohrmann >Assignee: Etienne Chauchot >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.15.0, 1.13.7, 1.14.5 > > > The test {{CassandraConnectorITCase.testRetrialAndDropTables}} fails on AZP > with > {code} > Jan 23 01:02:52 com.datastax.driver.core.exceptions.NoHostAvailableException: > All host(s) tried for query failed (tried: /172.17.0.1:59220 > (com.datastax.driver.core.exceptions.OperationTimedOutException: > [/172.17.0.1] Timed out waiting for server response)) > Jan 23 01:02:52 at > com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84) > Jan 23 01:02:52 at > com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37) > Jan 23 01:02:52 at > com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) > Jan 23 01:02:52 at > com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245) > Jan 23 01:02:52 at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63) > Jan 23 01:02:52 at > com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39) > Jan 23 01:02:52 at > org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase.testRetrialAndDropTables(CassandraConnectorITCase.java:554) > Jan 23 01:02:52 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Jan 23 01:02:52 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Jan 23 01:02:52 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Jan 23 01:02:52 at java.lang.reflect.Method.invoke(Method.java:498) > Jan 23 01:02:52 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > Jan 23 01:02:52 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Jan 23 01:02:52 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > Jan 23 01:02:52 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Jan 23 01:02:52 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > Jan 23 01:02:52 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > Jan 23 01:02:52 at > org.apache.flink.testutils.junit.RetryRule$RetryOnExceptionStatement.evaluate(RetryRule.java:196) > Jan 23 01:02:52 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Jan 23 01:02:52 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > Jan 23 01:02:52 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > Jan 23 01:02:52 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > Jan 23 01:02:52 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > Jan 23 01:02:52 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > Jan 23 01:02:52 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > Jan 23 01:02:52 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > Jan 23 01:02:52 at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30) > Jan 23 01:02:52 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > Jan 23
[jira] [Commented] (FLINK-25813) TableITCase.testCollectWithClose failed on azure
[ https://issues.apache.org/jira/browse/FLINK-25813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500560#comment-17500560 ] Yun Gao commented on FLINK-25813: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32459=logs=de826397-1924-5900-0034-51895f69d4b7=f311e913-93a2-5a37-acab-4a63e1328f94=10369 > TableITCase.testCollectWithClose failed on azure > > > Key: FLINK-25813 > URL: https://issues.apache.org/jira/browse/FLINK-25813 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.0 >Reporter: Yun Gao >Assignee: Caizhi Weng >Priority: Major > Labels: auto-deprioritized-critical, pull-request-available, > test-stability > > {code:java} > 2022-01-25T08:35:25.3735884Z Jan 25 08:35:25 [ERROR] > TableITCase.testCollectWithClose Time elapsed: 0.377 s <<< FAILURE! > 2022-01-25T08:35:25.3737127Z Jan 25 08:35:25 java.lang.AssertionError: Values > should be different. Actual: RUNNING > 2022-01-25T08:35:25.3738167Z Jan 25 08:35:25 at > org.junit.Assert.fail(Assert.java:89) > 2022-01-25T08:35:25.3739085Z Jan 25 08:35:25 at > org.junit.Assert.failEquals(Assert.java:187) > 2022-01-25T08:35:25.3739922Z Jan 25 08:35:25 at > org.junit.Assert.assertNotEquals(Assert.java:163) > 2022-01-25T08:35:25.3740846Z Jan 25 08:35:25 at > org.junit.Assert.assertNotEquals(Assert.java:177) > 2022-01-25T08:35:25.3742302Z Jan 25 08:35:25 at > org.apache.flink.table.api.TableITCase.testCollectWithClose(TableITCase.scala:135) > 2022-01-25T08:35:25.3743327Z Jan 25 08:35:25 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-01-25T08:35:25.3744343Z Jan 25 08:35:25 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-01-25T08:35:25.3745575Z Jan 25 08:35:25 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-01-25T08:35:25.3746840Z Jan 25 08:35:25 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-01-25T08:35:25.3747922Z Jan 25 08:35:25 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-01-25T08:35:25.3749151Z Jan 25 08:35:25 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-01-25T08:35:25.3750422Z Jan 25 08:35:25 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-01-25T08:35:25.3751820Z Jan 25 08:35:25 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-01-25T08:35:25.3753196Z Jan 25 08:35:25 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2022-01-25T08:35:25.3754253Z Jan 25 08:35:25 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2022-01-25T08:35:25.3755441Z Jan 25 08:35:25 at > org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258) > 2022-01-25T08:35:25.3756656Z Jan 25 08:35:25 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-01-25T08:35:25.3757778Z Jan 25 08:35:25 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-01-25T08:35:25.3758821Z Jan 25 08:35:25 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-01-25T08:35:25.3759840Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-01-25T08:35:25.3760919Z Jan 25 08:35:25 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2022-01-25T08:35:25.3762249Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2022-01-25T08:35:25.3763322Z Jan 25 08:35:25 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2022-01-25T08:35:25.3764436Z Jan 25 08:35:25 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2022-01-25T08:35:25.3765907Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2022-01-25T08:35:25.3766957Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > 2022-01-25T08:35:25.3768104Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > 2022-01-25T08:35:25.3769128Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > 2022-01-25T08:35:25.3770125Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > 2022-01-25T08:35:25.3771118Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > 2022-01-25T08:35:25.3772264Z Jan 25 08:35:25 at > org.junit.runners.Suite.runChild(Suite.java:128) >
[jira] [Updated] (FLINK-25813) TableITCase.testCollectWithClose failed on azure
[ https://issues.apache.org/jira/browse/FLINK-25813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao updated FLINK-25813: Labels: pull-request-available test-stability (was: auto-deprioritized-critical pull-request-available test-stability) > TableITCase.testCollectWithClose failed on azure > > > Key: FLINK-25813 > URL: https://issues.apache.org/jira/browse/FLINK-25813 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.0 >Reporter: Yun Gao >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available, test-stability > > {code:java} > 2022-01-25T08:35:25.3735884Z Jan 25 08:35:25 [ERROR] > TableITCase.testCollectWithClose Time elapsed: 0.377 s <<< FAILURE! > 2022-01-25T08:35:25.3737127Z Jan 25 08:35:25 java.lang.AssertionError: Values > should be different. Actual: RUNNING > 2022-01-25T08:35:25.3738167Z Jan 25 08:35:25 at > org.junit.Assert.fail(Assert.java:89) > 2022-01-25T08:35:25.3739085Z Jan 25 08:35:25 at > org.junit.Assert.failEquals(Assert.java:187) > 2022-01-25T08:35:25.3739922Z Jan 25 08:35:25 at > org.junit.Assert.assertNotEquals(Assert.java:163) > 2022-01-25T08:35:25.3740846Z Jan 25 08:35:25 at > org.junit.Assert.assertNotEquals(Assert.java:177) > 2022-01-25T08:35:25.3742302Z Jan 25 08:35:25 at > org.apache.flink.table.api.TableITCase.testCollectWithClose(TableITCase.scala:135) > 2022-01-25T08:35:25.3743327Z Jan 25 08:35:25 at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-01-25T08:35:25.3744343Z Jan 25 08:35:25 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-01-25T08:35:25.3745575Z Jan 25 08:35:25 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-01-25T08:35:25.3746840Z Jan 25 08:35:25 at > java.lang.reflect.Method.invoke(Method.java:498) > 2022-01-25T08:35:25.3747922Z Jan 25 08:35:25 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > 2022-01-25T08:35:25.3749151Z Jan 25 08:35:25 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2022-01-25T08:35:25.3750422Z Jan 25 08:35:25 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > 2022-01-25T08:35:25.3751820Z Jan 25 08:35:25 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2022-01-25T08:35:25.3753196Z Jan 25 08:35:25 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2022-01-25T08:35:25.3754253Z Jan 25 08:35:25 at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > 2022-01-25T08:35:25.3755441Z Jan 25 08:35:25 at > org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258) > 2022-01-25T08:35:25.3756656Z Jan 25 08:35:25 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > 2022-01-25T08:35:25.3757778Z Jan 25 08:35:25 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > 2022-01-25T08:35:25.3758821Z Jan 25 08:35:25 at > org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > 2022-01-25T08:35:25.3759840Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > 2022-01-25T08:35:25.3760919Z Jan 25 08:35:25 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > 2022-01-25T08:35:25.3762249Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > 2022-01-25T08:35:25.3763322Z Jan 25 08:35:25 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > 2022-01-25T08:35:25.3764436Z Jan 25 08:35:25 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > 2022-01-25T08:35:25.3765907Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > 2022-01-25T08:35:25.3766957Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > 2022-01-25T08:35:25.3768104Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > 2022-01-25T08:35:25.3769128Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > 2022-01-25T08:35:25.3770125Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > 2022-01-25T08:35:25.3771118Z Jan 25 08:35:25 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > 2022-01-25T08:35:25.3772264Z Jan 25 08:35:25 at > org.junit.runners.Suite.runChild(Suite.java:128) > 2022-01-25T08:35:25.3773118Z Jan 25 08:35:25 at > org.junit.runners.Suite.runChild(Suite.java:27) >
[jira] [Commented] (FLINK-25256) Savepoints do not work with ExternallyInducedSources
[ https://issues.apache.org/jira/browse/FLINK-25256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500557#comment-17500557 ] Dawid Wysakowicz commented on FLINK-25256: -- I don't have an opinion on a more explicit control plane at the moment. I'll abstain from that discussion for now, hope that's fine. The current situation is that we do have interfaces for that use case, for some time already. Unfortunately, they do not work too well in many situations. Does any of you have spare cycles to work on improvements there to get it to a usable state? We would be happy to guide such a contribution through in the runtime team. > Savepoints do not work with ExternallyInducedSources > > > Key: FLINK-25256 > URL: https://issues.apache.org/jira/browse/FLINK-25256 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.14.0, 1.13.3 >Reporter: Dawid Wysakowicz >Priority: Major > > It is not possible to take a proper savepoint with > {{ExternallyInducedSource}} or {{ExternallyInducedSourceReader}} (both legacy > and FLIP-27 versions). The problem is that we're hardcoding > {{CheckpointOptions}} in the {{triggerHook}}. > The outcome of current state is that operators would try to take checkpoints > in the checkpoint location whereas the {{CheckpointCoordinator}} will write > metadata for those states in the savepoint location. > Moreover the situation gets even weirder (I have not checked it entirely), if > we have a mixture of {{ExternallyInducedSource(s)}} and regular sources. In > such a case the location and format at which the state of a particular task > is persisted depends on the order of barriers arrival. If a barrier from a > regular source arrives last the task takes a savepoint, on the other hand if > last barrier is from an externally induced source it will take a checkpoint. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26321) KafkaSinkITCase.testWriteRecordsToKafkaWithExactlyOnceGuarantee fails on azure
[ https://issues.apache.org/jira/browse/FLINK-26321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500559#comment-17500559 ] Yun Gao commented on FLINK-26321: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32459=logs=c5612577-f1f7-5977-6ff6-7432788526f7=ffa8837a-b445-534e-cdf4-db364cf8235d=36032 > KafkaSinkITCase.testWriteRecordsToKafkaWithExactlyOnceGuarantee fails on azure > -- > > Key: FLINK-26321 > URL: https://issues.apache.org/jira/browse/FLINK-26321 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.15.0 >Reporter: Matthias Pohl >Assignee: Alexander Preuss >Priority: Critical > Labels: test-stability > > [This > build|https://dev.azure.com/mapohl/flink/_build/results?buildId=769=logs=d543d572-9428-5803-a30c-e8e09bf70915=4e4199a3-fbbb-5d5b-a2be-802955ffb013=35635] > failed due to a test failure in > {{KafkaSinkITCase.testWriteRecordsToKafkaWithExactlyOnceGuarantee}}: > {code} > Feb 22 21:55:15 [ERROR] Tests run: 8, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 65.897 s <<< FAILURE! - in > org.apache.flink.connector.kafka.sink.KafkaSinkITCase > Feb 22 21:55:15 [ERROR] > org.apache.flink.connector.kafka.sink.KafkaSinkITCase.testWriteRecordsToKafkaWithExactlyOnceGuarantee > Time elapsed: 25.251 s <<< FAILURE! > Feb 22 21:55:15 java.lang.AssertionError: expected:<20912> but was:<20913> > Feb 22 21:55:15 at org.junit.Assert.fail(Assert.java:89) > Feb 22 21:55:15 at org.junit.Assert.failNotEquals(Assert.java:835) > Feb 22 21:55:15 at org.junit.Assert.assertEquals(Assert.java:647) > Feb 22 21:55:15 at org.junit.Assert.assertEquals(Assert.java:633) > Feb 22 21:55:15 at > org.apache.flink.connector.kafka.sink.KafkaSinkITCase.writeRecordsToKafka(KafkaSinkITCase.java:424) > Feb 22 21:55:15 at > org.apache.flink.connector.kafka.sink.KafkaSinkITCase.testWriteRecordsToKafkaWithExactlyOnceGuarantee(KafkaSinkITCase.java:231) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] zoucao commented on pull request #18966: [FLINK-26422][docs-zh][table]update chinese doc with the new TablePipeline docs
zoucao commented on pull request #18966: URL: https://github.com/apache/flink/pull/18966#issuecomment-1057763421 cc @slinkydeveloper , Could you help me to find somebody to review it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18838: [FLINK-26177][Connector/pulsar] Use testcontainer pulsar runtime instead o…
flinkbot edited a comment on pull request #18838: URL: https://github.com/apache/flink/pull/18838#issuecomment-1044141081 ## CI report: * b737720f52bd34e5e9bde6e0e87f45098f8ad27d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32467) * 114fce453eb7b291e0fdde65ff8e9cbd2cc8aa9a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-26422) Update Chinese documentation with the new TablePipeline docs
[ https://issues.apache.org/jira/browse/FLINK-26422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-26422: --- Labels: chinese-translation pull-request-available (was: chinese-translation) > Update Chinese documentation with the new TablePipeline docs > > > Key: FLINK-26422 > URL: https://issues.apache.org/jira/browse/FLINK-26422 > Project: Flink > Issue Type: Bug > Components: chinese-translation, Documentation >Reporter: Francesco Guardiani >Assignee: zoucao >Priority: Major > Labels: chinese-translation, pull-request-available > > Chinese docs needs to be updated with the content of this commit: > https://github.com/apache/flink/commit/4f65c7950f2c3ef849f2094deab0e199ffedf57b -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] zoucao opened a new pull request #18966: [FLINK-26422][docs-zh][table]update chinese doc with the new TablePipeline docs
zoucao opened a new pull request #18966: URL: https://github.com/apache/flink/pull/18966 …line docs ## What is the purpose of the change - update chinese doc with the new TablePipeline docs https://github.com/apache/flink/pull/18804. ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-26313) Support Streaming KMeans in Flink ML
[ https://issues.apache.org/jira/browse/FLINK-26313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-26313: --- Labels: pull-request-available (was: ) > Support Streaming KMeans in Flink ML > > > Key: FLINK-26313 > URL: https://issues.apache.org/jira/browse/FLINK-26313 > Project: Flink > Issue Type: New Feature >Reporter: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > > Modify Flink ML's KMeans algorithm to support online model training and > update. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-ml] yunfengzhou-hub opened a new pull request #70: [FLINK-26313] Support Streaming KMeans in Flink ML
yunfengzhou-hub opened a new pull request #70: URL: https://github.com/apache/flink-ml/pull/70 ## What is the purpose of the change This PR adds Estimator and Transformer for the Streaming KMeans operator. Compared with the existing KMeans operator, Streaming KMeans allows to train KMeans model continuously from an unbounded train data stream. The corresponding Model operator also supports updating model data dynamically from a DataStream. Besides, this PR also adds simple infrastructures needed to test online algorithms, which allows to control the order to consume train data and predict data. ## Brief change log - Adds `StreamingKMeans`, `StreamingKMeansModel` and `StreamingKMeansParams` class to support Streaming KMeans algorithm. Also adds `StreamingKMeansTest` class to test these classes. - Adds `HasBatchStrategy` and `HasDecayFactor` interfaces to represent corresponding parameters for online algorithms. - Adds `MockBlockingQueueSinkFunction`, `MockBlockingQueueSourceFunction` and `TestBlockingQueueManager` to control the stream's velocity in online algorithm's test cases. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with @public(Evolving): (no) - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (Java doc) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18655: [FLINK-25799] [docs] Translate table/filesystem.md page into Chinese.
flinkbot edited a comment on pull request #18655: URL: https://github.com/apache/flink/pull/18655#issuecomment-1032265310 ## CI report: * 9f3ccd717c3d292cefeb49da137d03afd2c6710c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32463) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] echauchot commented on pull request #18610: [FLINK-23843][runtime] Properly fail the job when SplitEnumeratorContext.runInCoordinatorThread() throws an exception
echauchot commented on pull request #18610: URL: https://github.com/apache/flink/pull/18610#issuecomment-1057757502 Hey @dmvk thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi merged pull request #32: [hotfix] Fix ClassCastException when StoreSinkWriter#writeToFileStore encountered non-pk record
JingsongLi merged pull request #32: URL: https://github.com/apache/flink-table-store/pull/32 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-26313) Support Streaming KMeans in Flink ML
[ https://issues.apache.org/jira/browse/FLINK-26313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yunfeng Zhou updated FLINK-26313: - Summary: Support Streaming KMeans in Flink ML (was: Support Online KMeans in Flink ML) > Support Streaming KMeans in Flink ML > > > Key: FLINK-26313 > URL: https://issues.apache.org/jira/browse/FLINK-26313 > Project: Flink > Issue Type: New Feature >Reporter: Yunfeng Zhou >Priority: Major > > Modify Flink ML's KMeans algorithm to support online model training and > update. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-table-store] LadyForest commented on a change in pull request #32: [hotfix] Fix ClassCastException when StoreSinkWriter#writeToFileStore encountered non-pk record
LadyForest commented on a change in pull request #32: URL: https://github.com/apache/flink-table-store/pull/32#discussion_r818373831 ## File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java ## @@ -55,13 +55,15 @@ public boolean expired = false; +public boolean hasPk; Review comment: > TestFileStore Sure. Then we cannot resue `fileStore` in `StoreSinkTest` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] LadyForest commented on a change in pull request #32: [hotfix] Fix ClassCastException when StoreSinkWriter#writeToFileStore encountered non-pk record
LadyForest commented on a change in pull request #32: URL: https://github.com/apache/flink-table-store/pull/32#discussion_r818373831 ## File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java ## @@ -55,13 +55,15 @@ public boolean expired = false; +public boolean hasPk; Review comment: > TestFileStore Sure. Then we cannot resue `fileStore` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #32: [hotfix] Fix ClassCastException when StoreSinkWriter#writeToFileStore encountered non-pk record
JingsongLi commented on a change in pull request #32: URL: https://github.com/apache/flink-table-store/pull/32#discussion_r818372717 ## File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java ## @@ -55,13 +55,15 @@ public boolean expired = false; +public boolean hasPk; Review comment: Add a constructor: `public TestFileStore(boolean withPrimaryKey)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-26346) Add statistics collecting to sst files
[ https://issues.apache.org/jira/browse/FLINK-26346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-26346: --- Labels: pull-request-available (was: ) > Add statistics collecting to sst files > -- > > Key: FLINK-26346 > URL: https://issues.apache.org/jira/browse/FLINK-26346 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: 0.1.0 >Reporter: Caizhi Weng >Priority: Major > Labels: pull-request-available > > Currently field statistics are not collected in sst files. With statistics we > can do filter and other operations with better performance. > Some formats like orc already record statistics into file headers, so for > these special formats we just need to read them directly from files. For > others, however, we need to collect the statistics by hand. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-table-store] tsreaper opened a new pull request #33: [FLINK-26346] Add statistics collecting to sst files
tsreaper opened a new pull request #33: URL: https://github.com/apache/flink-table-store/pull/33 Currently field statistics are not collected in sst files. With statistics we can do filter and other operations with better performance. Some formats like orc already record statistics into file headers, so for these special formats we just need to read them directly from files. For others, however, we need to collect the statistics by hand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] LadyForest opened a new pull request #32: [hotfix] Fix ClassCastException when StoreSinkWriter#writeToFileStore encountered non-pk record
LadyForest opened a new pull request #32: URL: https://github.com/apache/flink-table-store/pull/32 # What is the purpose of the change This PR tries to fix the `ClassCastException` thrown when `StoreSinkWriter#writeToFileStore` encountered non-pk value. When a record has no pk, the value should be a count with long type, while the initial value assigns `int` and will cause a `ClassCastException` at runtime. #Verifying this change `StoreSinkTest` is modified slightly to check the value type. Roll back the src code change will reproduce the issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18958: [FLINK-15854][hive] Use the new type inference for Hive UDTF
flinkbot edited a comment on pull request #18958: URL: https://github.com/apache/flink/pull/18958#issuecomment-1056725576 ## CI report: * c833ce87dd2f24289efe7ad83cc476c0689a043c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32462) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18964: [FLINK-26455][state/changelog] Don't fail on materialization cancellation
flinkbot edited a comment on pull request #18964: URL: https://github.com/apache/flink/pull/18964#issuecomment-1057523838 ## CI report: * 43531c4f9e06f8c8b2d3595971161f61ea3008c6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32461) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] igalshilman commented on a change in pull request #306: [FLINK-25866][statefun] Support additional TLS configuration
igalshilman commented on a change in pull request #306: URL: https://github.com/apache/flink-statefun/pull/306#discussion_r818360880 ## File path: statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml ## @@ -16,5 +16,11 @@ kind: io.statefun.endpoints.v2/http spec: functions: statefun.smoke.e2e/command-interpreter-fn - urlPathTemplate: http://remote-function-host:8000 - maxNumBatchRequests: 1 \ No newline at end of file + urlPathTemplate: https://remote-function-host:8000 + maxNumBatchRequests: 1 + transport: +type: io.statefun.transports.v1/async +trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem +client_cert: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.crt +client_key: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.key.p8 +client_key_password: test Review comment: I don't think I understand. If you can specify it in a `module.yaml` as a utf8 string, then it should be possible to read the exact same string from a file, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] igalshilman commented on a change in pull request #306: [FLINK-25866][statefun] Support additional TLS configuration
igalshilman commented on a change in pull request #306: URL: https://github.com/apache/flink-statefun/pull/306#discussion_r818359939 ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java ## @@ -208,4 +225,77 @@ private void releaseChannel0(Channel channel) { } channel.close().addListener(ignored -> pool.release(channel)); } + + private static SslContext getSslContext(NettyRequestReplySpec spec) { +Optional maybeTrustCaCerts = spec.getTrustedCaCertsOptional(); +Optional maybeClientCerts = spec.getClientCertsOptional(); +Optional maybeClientKey = spec.getClientKeyOptional(); +Optional maybeKeyPassword = spec.getClientKeyPasswordOptional(); + +if (maybeClientCerts.isPresent() && !maybeClientKey.isPresent()) { + throw new IllegalStateException( + "You provided a client cert, but not a client key. Cannot continue."); +} +if (maybeClientKey.isPresent() && !maybeClientCerts.isPresent()) { Review comment: Oh, I see that make sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26179) Support periodic savepointing in the operator
[ https://issues.apache.org/jira/browse/FLINK-26179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500532#comment-17500532 ] Yun Tang commented on FLINK-26179: -- I don't think Flink community should support automatic savepoints in Flink k8s operator currently. >From my understanding, the k8s operator, which made by Google, introduced the >automatic savepoints for the purpose of [auto >failover|https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/savepoints_guide.md#automatically-restarting-job-from-the-latest-savepoint]. > Since we already support high-availiabty and could restore from LAST_STATE, I >cannot see any beneficts compared with period checkpoints. Current default savepoint format type is still CANONICAL which will scan the whole DB to write all the key-value pairs one by one. This is extremely slow and expensive, not to mentition the addition memory usage and additional disk usage to DFS in the large scale case. > Support periodic savepointing in the operator > - > > Key: FLINK-26179 > URL: https://issues.apache.org/jira/browse/FLINK-26179 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Gyula Fora >Priority: Major > > Automatic triggering of savepoints is a commonly requested feature. The > configuration should be part of the job spec. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18838: [FLINK-26177][Connector/pulsar] Use testcontainer pulsar runtime instead o…
flinkbot edited a comment on pull request #18838: URL: https://github.com/apache/flink/pull/18838#issuecomment-1044141081 ## CI report: * b737720f52bd34e5e9bde6e0e87f45098f8ad27d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32467) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu commented on a change in pull request #18957: [FLINK-26444][python]Window allocator supporting pyflink datastream API
dianfu commented on a change in pull request #18957: URL: https://github.com/apache/flink/pull/18957#discussion_r818274208 ## File path: flink-python/pyflink/fn_execution/datastream/window/window_assigner.py ## @@ -0,0 +1,376 @@ +import math +from typing import Iterable, Collection + +from pyflink.common import TypeSerializer, Time +from pyflink.common.typeinfo import Types +from pyflink.datastream import Trigger +from pyflink.datastream.state import ValueStateDescriptor, ValueState, ReducingStateDescriptor +from pyflink.datastream.window import TimeWindow, CountWindow, WindowAssigner, T, TimeWindowSerializer, TriggerResult, \ +CountWindowSerializer, MergingWindowAssigner +from pyflink.fn_execution.table.window_context import W + + +class EventTimeTrigger(Trigger[object, TimeWindow]): +""" +A Trigger that fires once the watermark passes the end of the window to which a pane belongs. +""" +def on_element(self, + element: object, + timestamp: int, + window: TimeWindow, + ctx: 'Trigger.TriggerContext') -> TriggerResult: +if window.max_timestamp() <= ctx.get_current_watermark(): +return TriggerResult.FIRE +else: +ctx.register_event_time_timer(window.max_timestamp()) +# No action is taken on the window. +return TriggerResult.CONTINUE + +def on_processing_time(self, + time: int, + window: TimeWindow, + ctx: 'Trigger.TriggerContext') -> TriggerResult: +# No action is taken on the window. +return TriggerResult.CONTINUE + +def on_event_time(self, + time: int, + window: TimeWindow, + ctx: 'Trigger.TriggerContext') -> TriggerResult: +if time == window.max_timestamp(): +return TriggerResult.FIRE +else: +# No action is taken on the window. +return TriggerResult.CONTINUE + Review comment: need to override the can_merge method ## File path: flink-python/pyflink/fn_execution/datastream/window/window_assigner.py ## @@ -0,0 +1,376 @@ +import math Review comment: missing license header ## File path: flink-python/pyflink/fn_execution/datastream/window/window_assigner.py ## @@ -0,0 +1,376 @@ +import math +from typing import Iterable, Collection + +from pyflink.common import TypeSerializer, Time +from pyflink.common.typeinfo import Types +from pyflink.datastream import Trigger +from pyflink.datastream.state import ValueStateDescriptor, ValueState, ReducingStateDescriptor +from pyflink.datastream.window import TimeWindow, CountWindow, WindowAssigner, T, TimeWindowSerializer, TriggerResult, \ +CountWindowSerializer, MergingWindowAssigner +from pyflink.fn_execution.table.window_context import W + + +class EventTimeTrigger(Trigger[object, TimeWindow]): +""" +A Trigger that fires once the watermark passes the end of the window to which a pane belongs. +""" +def on_element(self, + element: object, + timestamp: int, + window: TimeWindow, + ctx: 'Trigger.TriggerContext') -> TriggerResult: +if window.max_timestamp() <= ctx.get_current_watermark(): +return TriggerResult.FIRE +else: +ctx.register_event_time_timer(window.max_timestamp()) +# No action is taken on the window. +return TriggerResult.CONTINUE + +def on_processing_time(self, + time: int, + window: TimeWindow, + ctx: 'Trigger.TriggerContext') -> TriggerResult: +# No action is taken on the window. +return TriggerResult.CONTINUE + +def on_event_time(self, + time: int, + window: TimeWindow, + ctx: 'Trigger.TriggerContext') -> TriggerResult: +if time == window.max_timestamp(): +return TriggerResult.FIRE +else: +# No action is taken on the window. +return TriggerResult.CONTINUE + +def on_merge(self, + window: TimeWindow, + ctx: 'Trigger.OnMergeContext') -> None: +windowMaxTimestamp = window.max_timestamp() +if windowMaxTimestamp >= ctx.get_current_watermark(): Review comment: ```suggestion if windowMaxTimestamp > ctx.get_current_watermark(): ``` ## File path: flink-python/pyflink/fn_execution/datastream/window/window_assigner.py ## @@ -0,0 +1,376 @@ +import math +from typing import Iterable, Collection + +from pyflink.common import TypeSerializer, Time +from pyflink.common.typeinfo import Types +from pyflink.datastream import Trigger +from pyflink.datastream.state import
[GitHub] [flink] flinkbot edited a comment on pull request #18838: [FLINK-26177][Connector/pulsar] Use testcontainer pulsar runtime instead o…
flinkbot edited a comment on pull request #18838: URL: https://github.com/apache/flink/pull/18838#issuecomment-1044141081 ## CI report: * b737720f52bd34e5e9bde6e0e87f45098f8ad27d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32467) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hongshuboy edited a comment on pull request #18851: [hotfix][docs] Unified variable naming
hongshuboy edited a comment on pull request #18851: URL: https://github.com/apache/flink/pull/18851#issuecomment-1057716786 @knaufk Copy that, should i create an improvement on [Flink’s Jira](http://issues.apache.org/jira/browse/FLINK) ? Or commit to this branch directly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hongshuboy commented on pull request #18851: [hotfix][docs] Unified variable naming
hongshuboy commented on pull request #18851: URL: https://github.com/apache/flink/pull/18851#issuecomment-1057716786 @knaufk Coty that, should i create an improvement on [Flink’s Jira](http://issues.apache.org/jira/browse/FLINK) ? Or commit to this branch directly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-26247) Introduce a better expire strategy
[ https://issues.apache.org/jira/browse/FLINK-26247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-26247. Resolution: Fixed master: 57134d35fdd7bf3db1fd5048207fc4bacf198667 > Introduce a better expire strategy > -- > > Key: FLINK-26247 > URL: https://issues.apache.org/jira/browse/FLINK-26247 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Reporter: Jingsong Lee >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.1.0 > > > We can add the snapshot id in which the file/manifest was added to the table. > With this snapshot id, we can have better expire strategy. Instead of > scanning all files of the snapshot. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-26399) Make some option of operator configurable
[ https://issues.apache.org/jira/browse/FLINK-26399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-26399. -- Resolution: Fixed Merged: 43e171627e5cf47a866958c1e88b5c182b686802 > Make some option of operator configurable > - > > Key: FLINK-26399 > URL: https://issues.apache.org/jira/browse/FLINK-26399 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Aitozi >Assignee: Aitozi >Priority: Major > Labels: pull-request-available > > As described in > [pr|https://github.com/apache/flink-kubernetes-operator/pull/28], we'd better > to use option to control the operator related configs. I will first make the > scattered > static config variables in current version configurable. cc [~gyfora] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-table-store] JingsongLi merged pull request #30: [FLINK-26247] Optimize expire by only reading new changes in a snapshot
JingsongLi merged pull request #30: URL: https://github.com/apache/flink-table-store/pull/30 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26416) Release Testing: Sink V2 sanity checks
[ https://issues.apache.org/jira/browse/FLINK-26416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500521#comment-17500521 ] Liu commented on FLINK-26416: - [~fpaul] Thanks. I would like to test it after the blocker finishes. > Release Testing: Sink V2 sanity checks > -- > > Key: FLINK-26416 > URL: https://issues.apache.org/jira/browse/FLINK-26416 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.15.0 >Reporter: Fabian Paul >Priority: Blocker > Labels: release-testing > Fix For: 1.15.0 > > > With the introduction of Sink V2, the operator model of the sink changed > slightly therefore it makes sense to test different upgrade/sanity scenarios. > > You can take any of the existing Sinks in the project. I would recommend the > FileSink. > > # Run a job with Flink 1.14 and take a savepoint and try to restore and > resume with 1.15 > # Run a job with Flink 1.15 and take a savepoint and try to restore and > resume with 1.15 > # Run a bounded job with Flink 1.15 > > In all cases, please verify that all records have been written at the end of > the scenario and there are no duplicates. > > > > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] KarmaGYZ closed pull request #18959: [hotfix][runtime] Remove unused ResourceManagerServices
KarmaGYZ closed pull request #18959: URL: https://github.com/apache/flink/pull/18959 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-26458) Rename Accumulator to MergeFunction
Jingsong Lee created FLINK-26458: Summary: Rename Accumulator to MergeFunction Key: FLINK-26458 URL: https://issues.apache.org/jira/browse/FLINK-26458 Project: Flink Issue Type: Sub-task Components: Table Store Reporter: Jingsong Lee Fix For: table-store-0.1.0 See org.apache.flink.table.store.file.mergetree.compact.Accumulator. Actually, it is not an accumulator, but a merger. The naming of the accumulator is misleading. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26457) Introduce Join Accumulator for Wide table
Jingsong Lee created FLINK-26457: Summary: Introduce Join Accumulator for Wide table Key: FLINK-26457 URL: https://issues.apache.org/jira/browse/FLINK-26457 Project: Flink Issue Type: Sub-task Components: Table Store Reporter: Jingsong Lee Fix For: table-store-0.1.0 Consider a Join Accumulator, It will merge two records, completing the not-null fields. It is very useful for wide tables, where two source tables join together to form a wide table. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26452) Flink deploy on k8s when kubeconfig server is hostname not ip
[ https://issues.apache.org/jira/browse/FLINK-26452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hjw updated FLINK-26452: Description: ~/.kube/config apiVersion:v1 kind:config cluster: -name: "yf-dev-cluster1" cluster: server: "https://in-acpmanager.test.yfzx.cn/k8s/clusters/c-t5h2t; certificate-authority-data : “……" {code:java} 2022-03-02 18:59:30 | OkHttp https://in-acpmanager.test.yfzx.cn/...io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener Exec Failure javax.net.ssl.SSLPeerUnverifiedException Hostname in-acpmanager.test.yfzx.cn not verified: certificate: sha256/cw2T2s+Swhl7z+H35/3C1dTLxL26OOMO5VoEN9kAZCA= DN: CN=in-acpmanager.test.yfzx.cn subjectAltNames: [] io.fabric8.kubernetes.client.KubernetesClientException: Failed to start websocket at io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener.onFailure(WatcherWebSocketListener.java:77) at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.java:570) at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$1.onFailure(RealWebSocket.java:216) at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:180) at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Suppressed: java.lang.Throwable: waiting here at io.fabric8.kubernetes.client.utils.Utils.waitUntilReady(Utils.java:164) at io.fabric8.kubernetes.client.utils.Utils.waitUntilReadyOrFail(Utils.java:175) at io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener.waitUntilReady(WatcherWebSocketListener.java:120) at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.waitUntilReady(WatchConnectionManager.java:82) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.watch(BaseOperation.java:705) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.watch(BaseOperation.java:678) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.watch(BaseOperation.java: {code} {code:java} Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname in-acpmanager.test.yfzx.cn not verified: certificate: sha256/cw2T2s+Swhl7z+H35/3C1dTLxL26OOMO5VoEN9kAZCA= DN: CN=in-acpmanager.test.yfzx.cn subjectAltNames: [] at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:350) at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:300) at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.RealConnection.connect(RealConnection.java:185) at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:224) at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108) at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88) at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169) at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41) at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) at org.apache.flink.kubernetes.shaded.okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94) at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at org.apache.flink.kubernetes.shade {code} By the way . "kubectl get pod -n namespace" command is success in this node. The node is configured with DNS. was: ~/.kube/config apiVersion:v1 kind:config cluster: -name: "yf-dev-cluster1" cluster: server: "https://in-acpmanager.test.yfzx.cn/k8s/clusters/c-t5h2t; certificate-authority-data : “……" {code:java} 2022-03-02 18:59:30 | OkHttp
[GitHub] [flink] flinkbot edited a comment on pull request #18965: [FLINK-26387][connector/kafka] Use individual multi-broker cluster in broker failure tests
flinkbot edited a comment on pull request #18965: URL: https://github.com/apache/flink/pull/18965#issuecomment-1057611030 ## CI report: * 868aa2c455dd900823290ba207edf2ef81c7d2d5 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32464) * 8926624d81a884d9c4decd9948e90e8829a4228f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32468) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18965: [FLINK-26387][connector/kafka] Use individual multi-broker cluster in broker failure tests
flinkbot edited a comment on pull request #18965: URL: https://github.com/apache/flink/pull/18965#issuecomment-1057611030 ## CI report: * 868aa2c455dd900823290ba207edf2ef81c7d2d5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32464) * 8926624d81a884d9c4decd9948e90e8829a4228f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18838: [FLINK-26177][Connector/pulsar] Use testcontainer pulsar runtime instead o…
flinkbot edited a comment on pull request #18838: URL: https://github.com/apache/flink/pull/18838#issuecomment-1044141081 ## CI report: * 1cfecf5bdb46f6431f758fb299585a2b1ce4fcd7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32413) * b737720f52bd34e5e9bde6e0e87f45098f8ad27d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32467) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] PatrickRen commented on pull request #18965: [FLINK-26387][connector/kafka] Use individual multi-broker cluster in broker failure tests
PatrickRen commented on pull request #18965: URL: https://github.com/apache/flink/pull/18965#issuecomment-1057664371 Thanks @leonardBang for the review! I made another push just now. Please have a look when you are available. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18838: [FLINK-26177][Connector/pulsar] Use testcontainer pulsar runtime instead o…
flinkbot edited a comment on pull request #18838: URL: https://github.com/apache/flink/pull/18838#issuecomment-1044141081 ## CI report: * 1cfecf5bdb46f6431f758fb299585a2b1ce4fcd7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32413) * b737720f52bd34e5e9bde6e0e87f45098f8ad27d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] PatrickRen commented on a change in pull request #18965: [FLINK-26387][connector/kafka] Use individual multi-broker cluster in broker failure tests
PatrickRen commented on a change in pull request #18965: URL: https://github.com/apache/flink/pull/18965#discussion_r818310017 ## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ## @@ -496,17 +494,4 @@ private void unpause(int brokerId) throws Exception { pausedBroker.remove(brokerId); LOG.info("Broker {} is resumed", brokerId); } - -private KafkaConsumer createTempConsumer() { -Properties consumerProps = new Properties(); -consumerProps.putAll(getStandardProperties()); -consumerProps.setProperty( -ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, -VoidDeserializer.class.getCanonicalName()); -consumerProps.setProperty( -ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, -VoidDeserializer.class.getCanonicalName()); - consumerProps.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); -return new KafkaConsumer<>(consumerProps); -} Review comment: Thanks for the reminder! This class has too much warnings (technical debt) so I didn't notice this one Maybe we need a giant refactor on Kafka test utils in the future. They are deeply bound with the legacy FlinkKafkaProducer and FlinkKafkaConsumer now so a lots of horrible warnings in IDE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] PatrickRen commented on a change in pull request #18965: [FLINK-26387][connector/kafka] Use individual multi-broker cluster in broker failure tests
PatrickRen commented on a change in pull request #18965: URL: https://github.com/apache/flink/pull/18965#discussion_r818308240 ## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ## @@ -2610,7 +2620,9 @@ public T map(T value) throws Exception { if (failer && numElementsTotal >= failCount) { // shut down a Kafka broker -kafkaServer.stopBroker(shutdownBrokerId); +kafkaServerToKill.stopBroker(shutdownBrokerId); +hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed; +killedLeaderBefore = true; Review comment: Yep exactly Sorry for being so careless -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] PatrickRen commented on pull request #18863: [FLINK-26033][flink-connector-kafka]Fix the problem that robin does not take effect due to upgrading kafka client to 2.4.1 since Flink1.11
PatrickRen commented on pull request #18863: URL: https://github.com/apache/flink/pull/18863#issuecomment-1057658378 Thanks for the update @shizhengchao ! I think we can preserve your initial implementation of FlinkKafkaPartitioner to resolve the issue caused by KAFKA-9965. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500489#comment-17500489 ] Qingsheng Ren commented on FLINK-26033: --- Thanks [~tinny] for the explanation. I think this is related to KAFKA-9965 and it has not been resolved until now. It looks good to me to implement our own partitioner, which could also respect the backward compatibility since Kafka integrated round robin partitioner is only available after 2.4.0. > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Major > Labels: pull-request-available > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue = nextValue(topic); > List availablePartitions = > cluster.availablePartitionsForTopic(topic); > if (!availablePartitions.isEmpty()) { > int part = Utils.toPositive(nextValue) % > availablePartitions.size(); > return availablePartitions.get(part).partition(); > } else { > // no partitions are available, give a non-available partition > return Utils.toPositive(nextValue) % numPartitions; > } > } > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> { > return new AtomicInteger(0); > }); > return counter.getAndIncrement(); > } > public void close() {} > } {code} -- This
[jira] [Commented] (FLINK-26408) retract a non-existent record in RetractableTopNFunction
[ https://issues.apache.org/jira/browse/FLINK-26408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500488#comment-17500488 ] lincoln lee commented on FLINK-26408: - Agree with [~lzljs3620320] that it should be lenient for this case. I created another related issue FLINK-24666 to unify all the stream operators' behavior for such state staled error, let's expect it in 1.16. > retract a non-existent record in RetractableTopNFunction > - > > Key: FLINK-26408 > URL: https://issues.apache.org/jira/browse/FLINK-26408 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Table SQL / Runtime >Affects Versions: 1.12.2 >Reporter: Hengyu Dai >Priority: Blocker > > RetractableTopNFunction will throw a RuntimeException when > # the sorted Map {color:#0747a6}ValueState> > treeMap{color} is not empty. > # and the sorted Map doesn't contain current sort key. > Now we have Flink SQL job: > {code:java} > // table_a(a_key, a_time, a_jk), table_b(b_key, b_time, b_jk) > select > a_key,a_time,a_jk,b_key,b_time,b_jk > from > ( > select > a_key,a_time,a_jk,b_key,b_time,b_jk, > row_number() over(partition by a_key order by a_time desc) as rn > from > ( > select a_key, a_time, a_jk > from ( > select * , row_number() over(partition by a_key order by a_time > desc) as rn > from table_a > ) tmp1 > where rn = 1 > ) t1 > left join > ( > select b_key, b_time, b_jk > from ( > select * , row_number() over(partition by b_key order by b_time > desc) as rn > from table_b > ) tmp2 > where rn = 1 > ) t2 > on t1.a_jk = t2.b_jk > ) t3 > where rn = 1{code} > the JobGraph is like: > {{Source table_a —> Rank_a}} > {{—> Join —> > Final Rank }} > {{Source table_b —> Rank_b}} > Suppose we hava following input: > ||ts||SourceA > (a_key, a_time,a_jk)||SourceB > (b_key,b_time,b_jk)||RankA > (a_key, a_time,a_jk)||RankB > (b_key,b_time,b_jk)||Join > (a_key,b_key,a_time, a_jk)||Final Rank > (a_key,b_key,a_time)|| > |t1| |+(b1,1,jk1)| |+(b1,1,jk1)| | | > |t2| |+(b2,2,jk2)| |+(b2,2,jk2)| | | > |t3|+(a1,3,jk1)| |+(a1,3,jk1)| |+(a1,b1,3,jk1)|+(a1,b1,3)| > |t4|+(a1,4,jk1)| |-(a1,3,jk1) > +(a1,4,jk1)| |-(a1,b1,3,jk1) > +(a1,b1,4,jk1)|-(a1,b1,3) > +(a1,b1,4)| > |t5|+(a1,5,jk2)| |-(a1,4,jk1) > +(a1,5,jk2)| |-(a1,b1,4,jk1) > +(a1,b2,5,jk2)|-(a1,b1,4) > +(a1,b2,5)| > | | | | | | | | > > Assume: > # t4 is almost at the same time, the Join Operator produce 4 message at > t4, as the Hash Key changed(from jk1 to jk2), +(a1,b2,5,jk2) (which hashed > with jk2) may runs on different task from other 3 messages(hashed with jk1), > and it may arrive Final Rank earlier than them. > # Due to network congestion or high machine load, etc. the messages produced > at t4 on Join Operator take a while before they arrive Final Rank, when > Final Rank received them, the state is expired because of State TTL, the > treeMap state is cleared. > Now if +(a1,b2,5,jk2)arrives Final Rank first, the sortedMap of partition key > a1 will put a sort value 5. then when -(a1,b1,3,jk1)arrives Final Rank, it > will find that the sortedMap is not empty, and it doesn't contains sort key > value 3. meet the conditions for that Runtime Exception. > we met this exception in our production environment (Flink verision 1.12.2), > it's very serious because when it happens, the job can not recover > automatically as the state is polluted. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18224: [FLINK-25143][test] Add ITCase for periodic materialization
flinkbot edited a comment on pull request #18224: URL: https://github.com/apache/flink/pull/18224#issuecomment-1002391830 ## CI report: * 26a4a291a18a03a7c5819c528b914ae2a95cd5bf Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32346) * d713600ae0d830989882bf365847ce0f150ab94b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32466) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18224: [FLINK-25143][test] Add ITCase for periodic materialization
flinkbot edited a comment on pull request #18224: URL: https://github.com/apache/flink/pull/18224#issuecomment-1002391830 ## CI report: * 26a4a291a18a03a7c5819c528b914ae2a95cd5bf Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32346) * d713600ae0d830989882bf365847ce0f150ab94b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on a change in pull request #18965: [FLINK-26387][connector/kafka] Use individual multi-broker cluster in broker failure tests
leonardBang commented on a change in pull request #18965: URL: https://github.com/apache/flink/pull/18965#discussion_r818288213 ## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ## @@ -1463,23 +1463,30 @@ public void cancel() { public void runBrokerFailureTest() throws Exception { final String topic = "brokerFailureTestTopic"; +// Start a temporary multi-broker cluster. +// This test case relies on stopping a broker and switching partition leader to another +// during the test, so single-broker cluster (kafkaServer) could not fulfill the +// requirement. +KafkaTestEnvironment multiBrokerCluster = constructKafkaTestEnvironment(); Review comment: ```suggestion final KafkaTestEnvironment kafkaServerCluster = constructKafkaTestEnvironment(); ``` ## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ## @@ -2610,7 +2620,9 @@ public T map(T value) throws Exception { if (failer && numElementsTotal >= failCount) { // shut down a Kafka broker -kafkaServer.stopBroker(shutdownBrokerId); +kafkaServerToKill.stopBroker(shutdownBrokerId); +hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed; +killedLeaderBefore = true; Review comment: This should be a bug in your last PR and now you correct this IIUC, aha? ## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ## @@ -1463,23 +1463,30 @@ public void cancel() { public void runBrokerFailureTest() throws Exception { final String topic = "brokerFailureTestTopic"; +// Start a temporary multi-broker cluster. +// This test case relies on stopping a broker and switching partition leader to another +// during the test, so single-broker cluster (kafkaServer) could not fulfill the +// requirement. +KafkaTestEnvironment multiBrokerCluster = constructKafkaTestEnvironment(); Review comment: minor: we can move this piece of code to line `1478` for readable consideration ## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ## @@ -496,17 +494,4 @@ private void unpause(int brokerId) throws Exception { pausedBroker.remove(brokerId); LOG.info("Broker {} is resumed", brokerId); } - -private KafkaConsumer createTempConsumer() { -Properties consumerProps = new Properties(); -consumerProps.putAll(getStandardProperties()); -consumerProps.setProperty( -ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, -VoidDeserializer.class.getCanonicalName()); -consumerProps.setProperty( -ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, -VoidDeserializer.class.getCanonicalName()); - consumerProps.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); -return new KafkaConsumer<>(consumerProps); -} Review comment: plz check the IDE warning before open a PR thus we can avoid this kind of minor issue. ## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ## @@ -2584,14 +2591,17 @@ private boolean validateSequence( public static volatile boolean killedLeaderBefore; public static volatile boolean hasBeenCheckpointedBeforeFailure; +private static KafkaTestEnvironment kafkaServerToKill; Review comment: I tend to use `kafkaServerToShutdown` name because we call command `shutdown` instead of `kill ` command ## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ## @@ -1495,21 +1502,21 @@ public void runBrokerFailureTest() throws Exception { env.setRestartStrategy(RestartStrategies.noRestart()); Properties props = new Properties(); -props.putAll(standardProps); -props.putAll(secureProps); +props.putAll(multiBrokerCluster.getStandardProperties()); +props.putAll(multiBrokerCluster.getSecureProperties()); getStream(env, topic, schema, props) .map(new PartitionValidatingMapper(parallelism, 1)) -.map(new BrokerKillingMapper(leaderId, failAfterElements)) +.map(new BrokerKillingMapper<>(multiBrokerCluster, leaderId, failAfterElements)) .addSink(new ValidatingExactlyOnceSink(totalElements))
[GitHub] [flink-statefun] FilKarnicki commented on a change in pull request #306: [FLINK-25866][statefun] Support additional TLS configuration
FilKarnicki commented on a change in pull request #306: URL: https://github.com/apache/flink-statefun/pull/306#discussion_r818294287 ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java ## @@ -208,4 +225,77 @@ private void releaseChannel0(Channel channel) { } channel.close().addListener(ignored -> pool.release(channel)); } + + private static SslContext getSslContext(NettyRequestReplySpec spec) { +Optional maybeTrustCaCerts = spec.getTrustedCaCertsOptional(); +Optional maybeClientCerts = spec.getClientCertsOptional(); +Optional maybeClientKey = spec.getClientKeyOptional(); +Optional maybeKeyPassword = spec.getClientKeyPasswordOptional(); + +if (maybeClientCerts.isPresent() && !maybeClientKey.isPresent()) { + throw new IllegalStateException( + "You provided a client cert, but not a client key. Cannot continue."); +} +if (maybeClientKey.isPresent() && !maybeClientCerts.isPresent()) { Review comment: My intention was to write a kind of an XOR gate here. No cert/key is fine, both cert+key is fine, but not when there's one and not the other. Let me think if I can make this better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26179) Support periodic savepointing in the operator
[ https://issues.apache.org/jira/browse/FLINK-26179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500484#comment-17500484 ] Yang Wang commented on FLINK-26179: --- I hesitate to introduce this feature in the K8s operator since IIUC triggering savepoint might be too expensive in production. Also we already have the \{{LAST_STATE}} upgrade mode in FLINK-26141. I am pulling [~yunta], who is an expert of Flink state backend. > Support periodic savepointing in the operator > - > > Key: FLINK-26179 > URL: https://issues.apache.org/jira/browse/FLINK-26179 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Gyula Fora >Priority: Major > > Automatic triggering of savepoints is a commonly requested feature. The > configuration should be part of the job spec. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-statefun] FilKarnicki commented on a change in pull request #306: [FLINK-25866][statefun] Support additional TLS configuration
FilKarnicki commented on a change in pull request #306: URL: https://github.com/apache/flink-statefun/pull/306#discussion_r818290846 ## File path: statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml ## @@ -16,5 +16,11 @@ kind: io.statefun.endpoints.v2/http spec: functions: statefun.smoke.e2e/command-interpreter-fn - urlPathTemplate: http://remote-function-host:8000 - maxNumBatchRequests: 1 \ No newline at end of file + urlPathTemplate: https://remote-function-host:8000 + maxNumBatchRequests: 1 + transport: +type: io.statefun.transports.v1/async +trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem +client_cert: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.crt +client_key: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_client.key.p8 +client_key_password: test Review comment: I see, I'll make a change. I'm wondering if there's a way to read a pwd from a file while setting up TLS in Kafka. I'll have a poke around later :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-26403) SinkWriter should emit all the pending committables on endOfInput
[ https://issues.apache.org/jira/browse/FLINK-26403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao closed FLINK-26403. --- Fix Version/s: 1.15.0 Resolution: Fixed > SinkWriter should emit all the pending committables on endOfInput > - > > Key: FLINK-26403 > URL: https://issues.apache.org/jira/browse/FLINK-26403 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.15.0 >Reporter: Yun Gao >Assignee: Gen Luo >Priority: Blocker > Labels: pull-request-available > Fix For: 1.15.0 > > > Currently the SinkWriterOperator not drained all the pending committables on > endOfInput() and left them till final checkpoint, which would be in fact > deserted. This might cause data loss or the CommitterOperator hanged on > endOfInput() due to not received expected number of committables. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26403) SinkWriter should emit all the pending committables on endOfInput
[ https://issues.apache.org/jira/browse/FLINK-26403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500482#comment-17500482 ] Yun Gao commented on FLINK-26403: - Fix on master via 2c9d64e34cfd3025c87c1d3bbd2d1b596df11691. > SinkWriter should emit all the pending committables on endOfInput > - > > Key: FLINK-26403 > URL: https://issues.apache.org/jira/browse/FLINK-26403 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.15.0 >Reporter: Yun Gao >Assignee: Gen Luo >Priority: Blocker > Labels: pull-request-available > > Currently the SinkWriterOperator not drained all the pending committables on > endOfInput() and left them till final checkpoint, which would be in fact > deserted. This might cause data loss or the CommitterOperator hanged on > endOfInput() due to not received expected number of committables. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] gaoyunhaii closed pull request #18938: [FLINK-26403] SinkWriter should emit all the pending committables on endOfInput
gaoyunhaii closed pull request #18938: URL: https://github.com/apache/flink/pull/18938 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-26447) Clean up webhook jar and dependency management
[ https://issues.apache.org/jira/browse/FLINK-26447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-26447: --- Labels: pull-request-available (was: ) > Clean up webhook jar and dependency management > -- > > Key: FLINK-26447 > URL: https://issues.apache.org/jira/browse/FLINK-26447 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > > Currently the webhook module builds it's own shaded jar which includes the > operator shaded jar contents as well. > This is unnecessary and simply adds to the size of the image. > Operator dependencies should be in the provided scope and the operator shaded > jar simply put on the classpath when the webhook starts. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500479#comment-17500479 ] shizhengchao edited comment on FLINK-26033 at 3/3/22, 3:36 AM: --- [~MartijnVisser] [~renqs] I got through testing that the RoundRobinPartitioner built into kafka does not work either, that is it can't distribute the writes to all partitions equally, due to abortForNewBatch. For example, there are 10 partitions, `org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to even partitions, due to abortForNewBatch is true. So we should implement a round-robin partitioner in flink, and need to automatically discover the partition design was (Author: tinny): [~MartijnVisser] [~renqs] I got through testing that the RoundRobinPartitioner built into kafka does not work either, that is it can't distribute the writes to all partitions equally, due to abortForNewBatch. For example, there are 10 partitions, ` org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to even partitions, due to abortForNewBatch is true. So we should implement a round-robin partitioner in flink, and need to automatically discover the partition design > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Major > Labels: pull-request-available > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue =
[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500479#comment-17500479 ] shizhengchao commented on FLINK-26033: -- [~MartijnVisser] [~renqs] I got through testing that the RoundRobinPartitioner built into kafka does not work either, that is it can't distribute the writes to all partitions equally, due to abortForNewBatch. For example, there are 10 partitions, ` org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to even partitions, due to abortForNewBatch is true. So we should implement a round-robin partitioner in flink, and need to automatically discover the partition design > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect > -- > > Key: FLINK-26033 > URL: https://issues.apache.org/jira/browse/FLINK-26033 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3 >Reporter: shizhengchao >Assignee: shizhengchao >Priority: Major > Labels: pull-request-available > > In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it > does not take effect. Flink treats 'default' and 'round-robin' as the same > strategy. > {code:java} > //代码占位符 > public static Optional> > getFlinkKafkaPartitioner( > ReadableConfig tableOptions, ClassLoader classLoader) { > return tableOptions > .getOptional(SINK_PARTITIONER) > .flatMap( > (String partitioner) -> { > switch (partitioner) { > case SINK_PARTITIONER_VALUE_FIXED: > return Optional.of(new > FlinkFixedPartitioner<>()); > case SINK_PARTITIONER_VALUE_DEFAULT: > case SINK_PARTITIONER_VALUE_ROUND_ROBIN: > return Optional.empty(); > // Default fallback to full class name of the > partitioner. > default: > return Optional.of( > initializePartitioner(partitioner, > classLoader)); > } > }); > } {code} > They both use kafka's default partitioner, but the actual There are two > scenarios for the partition on DefaultPartitioner: > 1. Random when there is no key > 2. When there is a key, take the modulo according to the key > {code:java} > // org.apache.kafka.clients.producer.internals.DefaultPartitioner > public int partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] valueBytes, Cluster cluster) { > if (keyBytes == null) { > // Random when there is no key > return stickyPartitionCache.partition(topic, cluster); > } > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > // hash the keyBytes to choose a partition > return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; > } {code} > Therefore, KafkaConnector does not have a round-robin strategy.But we can > borrow from kafka's RoundRobinPartitioner > {code:java} > //代码占位符 > public class RoundRobinPartitioner implements Partitioner { > private final ConcurrentMap topicCounterMap = new > ConcurrentHashMap<>(); > public void configure(Map configs) {} > /** > * Compute the partition for the given record. > * > * @param topic The topic name > * @param key The key to partition on (or null if no key) > * @param keyBytes serialized key to partition on (or null if no key) > * @param value The value to partition on or null > * @param valueBytes serialized value to partition on or null > * @param cluster The current cluster metadata > */ > @Override > public int partition(String topic, Object key, byte[] keyBytes, Object > value, byte[] valueBytes, Cluster cluster) { > List partitions = cluster.partitionsForTopic(topic); > int numPartitions = partitions.size(); > int nextValue = nextValue(topic); > List availablePartitions = > cluster.availablePartitionsForTopic(topic); > if (!availablePartitions.isEmpty()) { > int part = Utils.toPositive(nextValue) % > availablePartitions.size(); > return availablePartitions.get(part).partition(); > } else { > // no partitions are available, give a non-available partition > return Utils.toPositive(nextValue) % numPartitions; > } > } > private int nextValue(String topic) { > AtomicInteger counter =
[jira] [Commented] (FLINK-26454) Improve operator logging
[ https://issues.apache.org/jira/browse/FLINK-26454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500475#comment-17500475 ] Yang Wang commented on FLINK-26454: --- It seems that the MDC is a good solution. But I am not sure whether other users have some concerns that I am not aware of. > Improve operator logging > > > Key: FLINK-26454 > URL: https://issues.apache.org/jira/browse/FLINK-26454 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Gyula Fora >Priority: Major > > At the moment the way information is logged throughout the operator is very > inconsistent. Some parts log the name of the deployment, some the name + > namespace, some neither of these. > We should try to clean this up and unify it across the operator. > I see basically 2 possible ways: > 1. Add a log formatter utility to always attach name + namespace information > to each logged line > 2. Remove namespace + name from everywhere and extract this as part of the > logger setting from MDC information the operator sdk already provides > ([https://javaoperatorsdk.io/docs/features)] > We should discuss this on the mailing list as part of this work -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-statefun] igalshilman commented on a change in pull request #306: [FLINK-25866][statefun] Support additional TLS configuration
igalshilman commented on a change in pull request #306: URL: https://github.com/apache/flink-statefun/pull/306#discussion_r818262446 ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplySpec.java ## @@ -76,14 +92,30 @@ public NettyRequestReplySpec( ofNullable(connectTimeout), () -> DEFAULT_CONNECT_TIMEOUT); this.pooledConnectionTTL = -ofNullable(pooledConnectionTTL).orElseGet(() -> DEFAULT_POOLED_CONNECTION_TTL); +ofNullable(pooledConnectionTTL).orElse(DEFAULT_POOLED_CONNECTION_TTL); this.connectionPoolMaxSize = ofNullable(connectionPoolMaxSize).orElse(DEFAULT_CONNECTION_POOL_MAX_SIZE); this.maxRequestOrResponseSizeInBytes = ofNullable(maxRequestOrResponseSizeInBytes) .orElse(DEFAULT_MAX_REQUEST_OR_RESPONSE_SIZE_IN_BYTES); } + public Optional getTrustedCaCertsOptional() { Review comment: I think that the `Optional` suffix is redundant, since it is clear from the return type. ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java ## @@ -208,4 +225,77 @@ private void releaseChannel0(Channel channel) { } channel.close().addListener(ignored -> pool.release(channel)); } + + private static SslContext getSslContext(NettyRequestReplySpec spec) { +Optional maybeTrustCaCerts = spec.getTrustedCaCertsOptional(); +Optional maybeClientCerts = spec.getClientCertsOptional(); +Optional maybeClientKey = spec.getClientKeyOptional(); +Optional maybeKeyPassword = spec.getClientKeyPasswordOptional(); + +if (maybeClientCerts.isPresent() && !maybeClientKey.isPresent()) { + throw new IllegalStateException( + "You provided a client cert, but not a client key. Cannot continue."); +} +if (maybeClientKey.isPresent() && !maybeClientCerts.isPresent()) { Review comment: This seems to me as a duplicate condition as the condition above. ## File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java ## @@ -208,4 +225,77 @@ private void releaseChannel0(Channel channel) { } channel.close().addListener(ignored -> pool.release(channel)); } + + private static SslContext getSslContext(NettyRequestReplySpec spec) { +Optional maybeTrustCaCerts = spec.getTrustedCaCertsOptional(); +Optional maybeClientCerts = spec.getClientCertsOptional(); +Optional maybeClientKey = spec.getClientKeyOptional(); +Optional maybeKeyPassword = spec.getClientKeyPasswordOptional(); + +if (maybeClientCerts.isPresent() && !maybeClientKey.isPresent()) { + throw new IllegalStateException( + "You provided a client cert, but not a client key. Cannot continue."); +} +if (maybeClientKey.isPresent() && !maybeClientCerts.isPresent()) { + throw new IllegalStateException( + "You provided a client key, but not a client cert. Cannot continue."); +} + +Optional maybeTrustCaCertsInputStream = +maybeTrustCaCerts.map( +trustedCaCertsLocation -> +openStreamIfExistsOrThrow( + ResourceLocator.findNamedResource(trustedCaCertsLocation))); + +Optional maybeCertInputStream = +maybeClientCerts.map( +clientCertLocation -> + openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientCertLocation))); + +Optional maybeKeyInputStream = +maybeClientKey.map( +clientKeyLocation -> + openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientKeyLocation))); + +SslContextBuilder sslContextBuilder = SslContextBuilder.forClient(); +maybeTrustCaCertsInputStream.ifPresent(sslContextBuilder::trustManager); +maybeCertInputStream.ifPresent( +certInputStream -> +maybeKeyInputStream.ifPresent( Review comment: What do you think about strongly indicating that `maybeKeyInputStream` is mandatory, by calling `get` on It (and suppress a warning)? ## File path: statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/CommandInterpreterAppServer.java ## @@ -20,33 +20,120 @@ import static org.apache.flink.statefun.e2e.smoke.java.Constants.CMD_INTERPRETER_FN; -import io.undertow.Undertow; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.*; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import
[GitHub] [flink] flinkbot edited a comment on pull request #18938: [FLINK-26403] SinkWriter should emit all the pending committables on endOfInput
flinkbot edited a comment on pull request #18938: URL: https://github.com/apache/flink/pull/18938#issuecomment-1055248048 ## CI report: * 6be86f37386c6962e3fbb6e7c2d7c506f44db30d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32408) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] shizhengchao commented on pull request #18863: [FLINK-26033][flink-connector-kafka]Fix the problem that robin does not take effect due to upgrading kafka client to 2.4.1 since Flink1.
shizhengchao commented on pull request #18863: URL: https://github.com/apache/flink/pull/18863#issuecomment-1057622360 @PatrickRen I got through testing that the RoundRobinPartitioner built into kafka does not work either, that is it can't distribute the writes to all partitions equally, due to abortForNewBatch. So this has to be redesigned in the high version of kafka client -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26431) Release Testing: Exception history for the adaptive scheduler
[ https://issues.apache.org/jira/browse/FLINK-26431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500470#comment-17500470 ] Liu commented on FLINK-26431: - [~nsemmler] I have no access to the project you mentioned. But I build my own job with failure every 30 seconds. After the job runs for some time, I see that the exception and the exception history appear in the web as you mentioned. The test is passed. Anything else? Thank you. > Release Testing: Exception history for the adaptive scheduler > - > > Key: FLINK-26431 > URL: https://issues.apache.org/jira/browse/FLINK-26431 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.15.0 >Reporter: Niklas Semmler >Assignee: Liu >Priority: Blocker > Labels: release-testing > Fix For: 1.15.0 > > > FLINK-21439 adds support for the exception history to the adaptive scheduler. > To evaluate the functionality follow the following steps: > Add the following lines to your flink config > {code:java} > taskmanager.host: localhost # a workaround > scheduler-mode: reactive > restart-strategy: fixeddelay > restart-strategy.fixed-delay.attempts: 100 > {code} > Create the test job JAR and move it to the usrlib folder. (You may have to > create it.) Here "$FLINK-HOME" refers to the folder of your flink > installation. > {code:java} > $ git clone g...@github.com:metaswirl/checkpoint-crasher-job.git > $ cd checkpoint-crasher-job > $ mvn install > $ mv target/checkpoint-crasher-job-0.1.jar $FLINK_HOME/usrlib > {code} > Start the job > {code:java} > $ bin/taskmanager.sh start > $ bin/taskmanager.sh start > $ bin/taskmanager.sh start > $ bin/standalone-job.sh start --main-class > org.apache.niklassemmler.flinkjobs.CheckpointCrasherJob > {code} > You should be able to find the exception `java.lang.RuntimeException: Test > exception` in the web interface under Jobs->Running Jobs->Exceptions. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25256) Savepoints do not work with ExternallyInducedSources
[ https://issues.apache.org/jira/browse/FLINK-25256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500468#comment-17500468 ] Jiangjie Qin commented on FLINK-25256: -- [~dwysakowicz] Pravega essentially uses an in-band checkpoint style. The checkpoint process is roughly the following: # Flink CheckpointCoordinator initiates the checkpoint, and invokes the MasterHooks.triggerHook(). The Pravega hook then tell the Pravega server that the Flink job has triggered a checkpoint. # The Pravega server inserts the checkpoint control messages in the data stream to each of the Prevaga readers of the Flink job. # When the Prevaga readers see the checkpoint control messages, they trigger the Flink task checkpoint via the {{ExternallyInducedSource.CheckpointTrigger}} After FLIP-27, the SplitEnumerator can completely replace the MasterHook in JM. But Prevaga connector still relies on the {{ExternallyInducedSource.CheckpointTrigger}} to perform checkpoint in the subtasks. Ultimately, the requirement is to manipulate the task based on some user space records. A similar requirement is stopping the subtask when it sees a given message in the stream. What we need to think of would be how much control plane actions do we want to expose to the users. So far the two actions we see are taking checkpoint on the tasks and stopping the tasks, and by now such manipulation requirements are only in the Source tasks. We can probably just make such records driven task actions a more explicit primitive for the users. For example, we can have an interface like {{{}TaskActionTrigger{}}}, which is passed to each user logic. And that allows user logic to ask the task to take some action based on the records it processed. That said, I do think such control plane exposure should be minimal. > Savepoints do not work with ExternallyInducedSources > > > Key: FLINK-25256 > URL: https://issues.apache.org/jira/browse/FLINK-25256 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.14.0, 1.13.3 >Reporter: Dawid Wysakowicz >Priority: Major > > It is not possible to take a proper savepoint with > {{ExternallyInducedSource}} or {{ExternallyInducedSourceReader}} (both legacy > and FLIP-27 versions). The problem is that we're hardcoding > {{CheckpointOptions}} in the {{triggerHook}}. > The outcome of current state is that operators would try to take checkpoints > in the checkpoint location whereas the {{CheckpointCoordinator}} will write > metadata for those states in the savepoint location. > Moreover the situation gets even weirder (I have not checked it entirely), if > we have a mixture of {{ExternallyInducedSource(s)}} and regular sources. In > such a case the location and format at which the state of a particular task > is persisted depends on the order of barriers arrival. If a barrier from a > regular source arrives last the task takes a savepoint, on the other hand if > last barrier is from an externally induced source it will take a checkpoint. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18965: [FLINK-26387][connector/kafka] Use individual multi-broker cluster in broker failure tests
flinkbot edited a comment on pull request #18965: URL: https://github.com/apache/flink/pull/18965#issuecomment-1057611030 ## CI report: * 868aa2c455dd900823290ba207edf2ef81c7d2d5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32464) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #18965: [FLINK-26387][connector/kafka] Use individual multi-broker cluster in broker failure tests
flinkbot commented on pull request #18965: URL: https://github.com/apache/flink/pull/18965#issuecomment-1057611030 ## CI report: * 868aa2c455dd900823290ba207edf2ef81c7d2d5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18655: [FLINK-25799] [docs] Translate table/filesystem.md page into Chinese.
flinkbot edited a comment on pull request #18655: URL: https://github.com/apache/flink/pull/18655#issuecomment-1032265310 ## CI report: * b6268adf337f01f20e13f8edd6e67608bb7f1afb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32414) * 9f3ccd717c3d292cefeb49da137d03afd2c6710c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32463) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-26400) Release Testing: Explicit shutdown signalling from TaskManager to JobManager
[ https://issues.apache.org/jira/browse/FLINK-26400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500466#comment-17500466 ] Zhu Zhu edited comment on FLINK-26400 at 3/3/22, 2:44 AM: -- Here's what I see on the page "Limited integration with Flink’s Web UI: Adaptive Scheduler allows that a job’s parallelism can change over its lifetime. The web UI only shows the current parallelism the job." Seems the two problems listed above are not described in the known limitation. So I think they need to be fixed. But I agree that they are not blockers of 1.15 because the problem has been there for some versions and are not blocker to users. Thanks for updating FLINK-22243. I have attached one picture to show problem #1. was (Author: zhuzh): Here's what I see on the page "Limited integration with Flink’s Web UI: Adaptive Scheduler allows that a job’s parallelism can change over its lifetime. The web UI only shows the current parallelism the job." Seems the two problems listed above are not described in the known limitation. So I think they need to be fixed. But I agree that they are not blockers of 1.15 because the problem has been there for some versions and not blocker for users. Thanks for updating FLINK-22243. I have attached one picture to show problem #1. > Release Testing: Explicit shutdown signalling from TaskManager to JobManager > > > Key: FLINK-26400 > URL: https://issues.apache.org/jira/browse/FLINK-26400 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.15.0 >Reporter: Niklas Semmler >Assignee: Zhu Zhu >Priority: Blocker > Labels: release-testing > Fix For: 1.15.0 > > Attachments: errors_on_opening_job_page_when_job_gets_no_resources.png > > > FLINK-25277 introduces explicit signalling between a TaskManager and the > JobManager when the TaskManager shuts down. This reduces the time it takes > for a reactive cluster to down-scale & restart. > > *Setup* > # Add the following line to your flink config to enable reactive mode: > {code} > taskmanager.host: localhost # a workaround > scheduler-mode: reactive > restart-strategy: fixeddelay > restart-strategy.fixed-delay.attempts: 100 > {code} > # Create a “usrlib” folder and place the TopSpeedWindowing jar into it > {code:bash} > $ mkdir usrlib > $ cp examples/streaming/TopSpeedWindowing.jar usrlib/ > {code} > # Start the job > {code:bash} > $ bin/standalone-job.sh start --main-class > org.apache.flink.streaming.examples.windowing.TopSpeedWindowing > {code} > # Start three task managers > {code:bash} > $ bin/taskmanager.sh start > $ bin/taskmanager.sh start > $ bin/taskmanager.sh start > {code} > # Wait for the job to stabilize. The log file should show that three tasks > start for every operator. > {code} > GlobalWindows -> Sink: Print to Std. Out (3/3) > (d10339d5755d07f3d9864ed1b2147af2) switched from INITIALIZING to > RUNNING.{code} > *Test* > Stop one taskmanager > {code:bash} > $ bin/taskmanager.sh stop > {code} > Success condition: You should see that the job cancels and re-runs after a > few seconds. In the logs you should find a line with the text “The > TaskExecutor is shutting down”. > *Teardown* > Stop all taskmanagers and the jobmanager: > {code:bash} > $ bin/standalone-job.sh stop > $ bin/taskmanager.sh stop-all > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26400) Release Testing: Explicit shutdown signalling from TaskManager to JobManager
[ https://issues.apache.org/jira/browse/FLINK-26400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500466#comment-17500466 ] Zhu Zhu commented on FLINK-26400: - Here's what I see on the page "Limited integration with Flink’s Web UI: Adaptive Scheduler allows that a job’s parallelism can change over its lifetime. The web UI only shows the current parallelism the job." Seems the two problems listed above are not described in the known limitation. So I think they need to be fixed. But I agree that they are not blockers of 1.15 because the problem has been there for some versions and not blocker for users. Thanks for updating FLINK-22243. I have attached one picture to show problem #1. > Release Testing: Explicit shutdown signalling from TaskManager to JobManager > > > Key: FLINK-26400 > URL: https://issues.apache.org/jira/browse/FLINK-26400 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.15.0 >Reporter: Niklas Semmler >Assignee: Zhu Zhu >Priority: Blocker > Labels: release-testing > Fix For: 1.15.0 > > Attachments: errors_on_opening_job_page_when_job_gets_no_resources.png > > > FLINK-25277 introduces explicit signalling between a TaskManager and the > JobManager when the TaskManager shuts down. This reduces the time it takes > for a reactive cluster to down-scale & restart. > > *Setup* > # Add the following line to your flink config to enable reactive mode: > {code} > taskmanager.host: localhost # a workaround > scheduler-mode: reactive > restart-strategy: fixeddelay > restart-strategy.fixed-delay.attempts: 100 > {code} > # Create a “usrlib” folder and place the TopSpeedWindowing jar into it > {code:bash} > $ mkdir usrlib > $ cp examples/streaming/TopSpeedWindowing.jar usrlib/ > {code} > # Start the job > {code:bash} > $ bin/standalone-job.sh start --main-class > org.apache.flink.streaming.examples.windowing.TopSpeedWindowing > {code} > # Start three task managers > {code:bash} > $ bin/taskmanager.sh start > $ bin/taskmanager.sh start > $ bin/taskmanager.sh start > {code} > # Wait for the job to stabilize. The log file should show that three tasks > start for every operator. > {code} > GlobalWindows -> Sink: Print to Std. Out (3/3) > (d10339d5755d07f3d9864ed1b2147af2) switched from INITIALIZING to > RUNNING.{code} > *Test* > Stop one taskmanager > {code:bash} > $ bin/taskmanager.sh stop > {code} > Success condition: You should see that the job cancels and re-runs after a > few seconds. In the logs you should find a line with the text “The > TaskExecutor is shutting down”. > *Teardown* > Stop all taskmanagers and the jobmanager: > {code:bash} > $ bin/standalone-job.sh stop > $ bin/taskmanager.sh stop-all > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18655: [FLINK-25799] [docs] Translate table/filesystem.md page into Chinese.
flinkbot edited a comment on pull request #18655: URL: https://github.com/apache/flink/pull/18655#issuecomment-1032265310 ## CI report: * b6268adf337f01f20e13f8edd6e67608bb7f1afb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32414) * 9f3ccd717c3d292cefeb49da137d03afd2c6710c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-26400) Release Testing: Explicit shutdown signalling from TaskManager to JobManager
[ https://issues.apache.org/jira/browse/FLINK-26400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-26400: Attachment: errors_on_opening_job_page_when_job_gets_no_resources.png > Release Testing: Explicit shutdown signalling from TaskManager to JobManager > > > Key: FLINK-26400 > URL: https://issues.apache.org/jira/browse/FLINK-26400 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.15.0 >Reporter: Niklas Semmler >Assignee: Zhu Zhu >Priority: Blocker > Labels: release-testing > Fix For: 1.15.0 > > Attachments: errors_on_opening_job_page_when_job_gets_no_resources.png > > > FLINK-25277 introduces explicit signalling between a TaskManager and the > JobManager when the TaskManager shuts down. This reduces the time it takes > for a reactive cluster to down-scale & restart. > > *Setup* > # Add the following line to your flink config to enable reactive mode: > {code} > taskmanager.host: localhost # a workaround > scheduler-mode: reactive > restart-strategy: fixeddelay > restart-strategy.fixed-delay.attempts: 100 > {code} > # Create a “usrlib” folder and place the TopSpeedWindowing jar into it > {code:bash} > $ mkdir usrlib > $ cp examples/streaming/TopSpeedWindowing.jar usrlib/ > {code} > # Start the job > {code:bash} > $ bin/standalone-job.sh start --main-class > org.apache.flink.streaming.examples.windowing.TopSpeedWindowing > {code} > # Start three task managers > {code:bash} > $ bin/taskmanager.sh start > $ bin/taskmanager.sh start > $ bin/taskmanager.sh start > {code} > # Wait for the job to stabilize. The log file should show that three tasks > start for every operator. > {code} > GlobalWindows -> Sink: Print to Std. Out (3/3) > (d10339d5755d07f3d9864ed1b2147af2) switched from INITIALIZING to > RUNNING.{code} > *Test* > Stop one taskmanager > {code:bash} > $ bin/taskmanager.sh stop > {code} > Success condition: You should see that the job cancels and re-runs after a > few seconds. In the logs you should find a line with the text “The > TaskExecutor is shutting down”. > *Teardown* > Stop all taskmanagers and the jobmanager: > {code:bash} > $ bin/standalone-job.sh stop > $ bin/taskmanager.sh stop-all > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-26435) Provide the container name for CI debug log
[ https://issues.apache.org/jira/browse/FLINK-26435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang closed FLINK-26435. - Resolution: Fixed Fixed via: main: 47f02cb7e0318dee16d321c91867b9a94337b9da > Provide the container name for CI debug log > > > Key: FLINK-26435 > URL: https://issues.apache.org/jira/browse/FLINK-26435 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Aitozi >Assignee: Aitozi >Priority: Major > Labels: pull-request-available > > Current kubectl log for the CI have not specified the container name as below > > {code:java} > Flink logs: > Current logs for flink-operator-6c66c5-9ptqw: > error: a container name must be specified for pod > flink-operator-6c66c5-9ptqw, choose one of: [flink-operator > flink-webhook] {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] PatrickRen opened a new pull request #18965: [FLINK-26387][connector/kafka] Use individual multi-broker cluster in broker failure tests
PatrickRen opened a new pull request #18965: URL: https://github.com/apache/flink/pull/18965 ## What is the purpose of the change This pull request fixes a flaky broker failure test that should use multi-broker cluster for replicating partitions during the test run. ## Brief change log - Use individual multi-broker cluster in broker failure tests - Re-enable ignored tests ## Verifying this change This change is already covered by existing broker failure tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-26447) Clean up webhook jar and dependency management
[ https://issues.apache.org/jira/browse/FLINK-26447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang reassigned FLINK-26447: - Assignee: Nicholas Jiang (was: Yang Wang) > Clean up webhook jar and dependency management > -- > > Key: FLINK-26447 > URL: https://issues.apache.org/jira/browse/FLINK-26447 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Nicholas Jiang >Priority: Major > > Currently the webhook module builds it's own shaded jar which includes the > operator shaded jar contents as well. > This is unnecessary and simply adds to the size of the image. > Operator dependencies should be in the provided scope and the operator shaded > jar simply put on the classpath when the webhook starts. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26444) Window allocator supporting pyflink datastream API
[ https://issues.apache.org/jira/browse/FLINK-26444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-26444: Fix Version/s: 1.16.0 > Window allocator supporting pyflink datastream API > -- > > Key: FLINK-26444 > URL: https://issues.apache.org/jira/browse/FLINK-26444 > Project: Flink > Issue Type: New Feature > Components: API / Python >Affects Versions: 1.14.3 >Reporter: zhangjingcun >Assignee: zhangjingcun >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Pyflink datastream api: > When using window operations in pyflink datastream api, > It needs to be implemented by customizing the window, which increases the > difficulty of use.Therefore, the tumbling, sliding, session, and counting > window implementation classes are encapsulated, and the developer directly > calls the class to use. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-26444) Window allocator supporting pyflink datastream API
[ https://issues.apache.org/jira/browse/FLINK-26444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-26444: --- Assignee: zhangjingcun > Window allocator supporting pyflink datastream API > -- > > Key: FLINK-26444 > URL: https://issues.apache.org/jira/browse/FLINK-26444 > Project: Flink > Issue Type: New Feature > Components: API / Python >Affects Versions: 1.14.3 >Reporter: zhangjingcun >Assignee: zhangjingcun >Priority: Major > Labels: pull-request-available > > Pyflink datastream api: > When using window operations in pyflink datastream api, > It needs to be implemented by customizing the window, which increases the > difficulty of use.Therefore, the tumbling, sliding, session, and counting > window implementation classes are encapsulated, and the developer directly > calls the class to use. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (FLINK-25152) FLIP-188: Introduce Built-in Dynamic Table Storage
[ https://issues.apache.org/jira/browse/FLINK-25152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee closed FLINK-25152. Resolution: Fixed > FLIP-188: Introduce Built-in Dynamic Table Storage > -- > > Key: FLINK-25152 > URL: https://issues.apache.org/jira/browse/FLINK-25152 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Ecosystem, Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Fix For: 1.15.0 > > > introduce built-in storage support for dynamic table, a truly unified > changelog & table representation, from Flink SQL’s perspective. The storage > will improve the usability a lot. > More detail see: > [https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-26293) Store log offsets in snapshot
[ https://issues.apache.org/jira/browse/FLINK-26293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-26293: Assignee: Nicholas Jiang > Store log offsets in snapshot > - > > Key: FLINK-26293 > URL: https://issues.apache.org/jira/browse/FLINK-26293 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Reporter: Jingsong Lee >Assignee: Nicholas Jiang >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.1.0 > > > Now we have put logOffsets inside ManifestCommittable, but it is not saved > when generating snapshot in FileStoreCommitImpl. > We need to save the logOffsets in the snapshot and some additional processing > is required: > When logOffsets does not contain all buckets, it means that the current > generated snapshot does not write data to all buckets, and we should read the > last snapshot to complete the bucket's offsets. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #31: [FLINK-26293] Store log offsets in snapshot
JingsongLi commented on a change in pull request #31: URL: https://github.com/apache/flink-table-store/pull/31#discussion_r818263016 ## File path: flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java ## @@ -150,9 +150,12 @@ public void testOverwritePartialCommit() throws Exception { TestFileStore store = createStore(false); store.commitData( - data1.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), -gen::getPartition, -kv -> 0); +data1.values().stream() +.flatMap(Collection::stream) +.collect(Collectors.toList()), +gen::getPartition, +kv -> 0) +.forEach(snapshot -> assertThat(snapshot.getLogOffsets().get(0)).isEqualTo(1)); Review comment: I didn't quite understand your test. As I understand it, the test logic should be: - commit with full log offsets, assert. - commit with partial log offsets (Complemented by the previous snapshot), assert. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #31: [FLINK-26293] Store log offsets in snapshot
JingsongLi commented on a change in pull request #31: URL: https://github.com/apache/flink-table-store/pull/31#discussion_r818260657 ## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java ## @@ -276,7 +273,7 @@ private void tryOverwrite( private boolean tryCommitOnce( List changes, -String identifier, +ManifestCommittable committable, Review comment: I prefer pass `identifier` and `logOffsets` here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #31: [FLINK-26293] Store log offsets in snapshot
JingsongLi commented on a change in pull request #31: URL: https://github.com/apache/flink-table-store/pull/31#discussion_r818260224 ## File path: flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java ## @@ -304,10 +301,17 @@ private boolean tryCommitOnce( String manifestListName = null; List oldMetas = new ArrayList<>(); List newMetas = new ArrayList<>(); +Map logOffsets = committable.logOffsets(); try { if (latestSnapshot != null) { // read all previous manifest files oldMetas.addAll(manifestList.read(latestSnapshot.manifestList())); +// read the last snapshot to complete the bucket's offsets when logOffsets does not +// contain all buckets +logOffsets.putAll( Review comment: `latestSnapshot.getLogOffsets().forEach(logOffsets::putIfAbsent)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25068) Show the maximum parallelism (number of key groups) of a job in Web UI
[ https://issues.apache.org/jira/browse/FLINK-25068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500462#comment-17500462 ] zlzhang0122 commented on FLINK-25068: - [~airblader] can you help to review the pr? > Show the maximum parallelism (number of key groups) of a job in Web UI > -- > > Key: FLINK-25068 > URL: https://issues.apache.org/jira/browse/FLINK-25068 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 1.14.0 >Reporter: zlzhang0122 >Assignee: zlzhang0122 >Priority: Major > Labels: pull-request-available > > Now, Flink use maximum parallelism as the number of key groups to distribute > the key, the maximum parallelism can be set manually, or flink will set-up > automatically, sometimes the value is useful and we may want to know it, > maybe we can expose in the Web UI. > By doing this, we can easily know the max parallelism we can suggest the user > to scale when they are facing the leak of through-output, and we can know > which subtask will processing the special value and we can find the log soon. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] flinkbot edited a comment on pull request #18938: [FLINK-26403] SinkWriter should emit all the pending committables on endOfInput
flinkbot edited a comment on pull request #18938: URL: https://github.com/apache/flink/pull/18938#issuecomment-1055248048 ## CI report: * 6be86f37386c6962e3fbb6e7c2d7c506f44db30d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32408) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #18938: [FLINK-26403] SinkWriter should emit all the pending committables on endOfInput
flinkbot edited a comment on pull request #18938: URL: https://github.com/apache/flink/pull/18938#issuecomment-1055248048 ## CI report: * 6be86f37386c6962e3fbb6e7c2d7c506f44db30d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32408) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26424) RequestTimeoutException
[ https://issues.apache.org/jira/browse/FLINK-26424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500460#comment-17500460 ] xinchenyuan commented on FLINK-26424: - :)Hope to get a conclusion desperately. online serving things must be more stable. The complexity in function side and the resilience in runtime side can be both solved, not a tradeoff > RequestTimeoutException > > > Key: FLINK-26424 > URL: https://issues.apache.org/jira/browse/FLINK-26424 > Project: Flink > Issue Type: Bug > Components: Stateful Functions >Affects Versions: statefun-3.2.0 >Reporter: xinchenyuan >Priority: Major > > there is no max retries, all I got is the call timeout > as doc said, [Transport Spec > |https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/modules/http-endpoint/#transport-1]call > will be failed after timeout. > but when expcetion raised, runtime restart, I'm confused why a function > internal error will cause such a big problem, will MAX RETRIES be a > configurable param? > > 2022-02-28 17:58:32 > org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException: > An error occurred when attempting to invoke function FunctionType(tendoc, > AlertNotificationIngressCkafka). > at > org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50) > at > org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:73) > at > org.apache.flink.statefun.flink.core.functions.FunctionActivation.applyNextPendingEnvelope(FunctionActivation.java:50) > at > org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:61) > at > org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164) > at > org.apache.flink.statefun.flink.core.functions.AsyncSink.drainOnOperatorThread(AsyncSink.java:119) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:86) > at > org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:88) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:89) > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:186) > at > org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: java.lang.IllegalStateException: Failure forwarding a message to a > remote function Address(tendoc, AlertNotificationIngressCkafka, cls-message) > at > org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:170) > at >
[GitHub] [flink] gaoyunhaii commented on pull request #18938: [FLINK-26403] SinkWriter should emit all the pending committables on endOfInput
gaoyunhaii commented on pull request #18938: URL: https://github.com/apache/flink/pull/18938#issuecomment-1057588616 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-26437) Cannot discover a connector using option: 'connector'='jdbc'
[ https://issues.apache.org/jira/browse/FLINK-26437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500459#comment-17500459 ] Yuan Zhu commented on FLINK-26437: -- In my local env(wsl-ubuntu-20.04), I can run "sql-client.sh embedded" with jars, which are listed below, in flink lib. !image-2022-03-03-10-03-50-763.png! And the sql is almost like yours: {code:java} CREATE TABLE source ( user_id varchar, item_id varchar, category_id varchar, behavior varchar, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'test_json', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'localhost:9392', 'format' = 'json', 'json.ignore-parse-errors' = 'true', 'properties.group.id' = 'test'); CREATE TABLE user_details_fs WITH ( 'connector' = 'filesystem', 'path' = 'file:///mnt/c/Users/zhuyuan/Desktop/', 'format' = 'parquet' ) LIKE source (EXCLUDING ALL); insert into user_details_fs select * from source;{code} > Cannot discover a connector using option: 'connector'='jdbc' > > > Key: FLINK-26437 > URL: https://issues.apache.org/jira/browse/FLINK-26437 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.13.6 >Reporter: Arindam Bhattacharjee >Priority: Major > Labels: sql-api, table-api > Attachments: image-2022-03-03-10-03-50-763.png > > Original Estimate: 24h > Remaining Estimate: 24h > > Hi Team, > When I was running SQL in Flink SQL-API, was getting the below error - > *Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a > connector using option: 'connector'='jdbc'* > at > org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467) > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441) > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:167) > ... 32 more > Caused by: org.apache.flink.table.api.ValidationException: Could not find any > factory for identifier 'jdbc' that implements > 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. > Available factory identifiers are: > blackhole > datagen > filesystem > kafka > print > upsert-kafka > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) > at > org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463) > ... 34 more > > > SQL I was using - > _CREATE TABLE pvuv_sink (_ > _dt varchar PRIMARY KEY,_ > _pv BIGINT,_ > _uv BIGINT_ > _) WITH (_ > _'connector' = 'jdbc',_ > _'url' = 'jdbc:mysql://localhost:3306/flinksql_test',_ > _'table-name' = 'pvuv_sink',_ > _'username' = 'root',_ > _'password' = 'xx',_ > _'sink.buffer-flush.max-rows' = '1'_ > _);_ -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26437) Cannot discover a connector using option: 'connector'='jdbc'
[ https://issues.apache.org/jira/browse/FLINK-26437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuan Zhu updated FLINK-26437: - Attachment: image-2022-03-03-10-03-50-763.png > Cannot discover a connector using option: 'connector'='jdbc' > > > Key: FLINK-26437 > URL: https://issues.apache.org/jira/browse/FLINK-26437 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.13.6 >Reporter: Arindam Bhattacharjee >Priority: Major > Labels: sql-api, table-api > Attachments: image-2022-03-03-10-03-50-763.png > > Original Estimate: 24h > Remaining Estimate: 24h > > Hi Team, > When I was running SQL in Flink SQL-API, was getting the below error - > *Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a > connector using option: 'connector'='jdbc'* > at > org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467) > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441) > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:167) > ... 32 more > Caused by: org.apache.flink.table.api.ValidationException: Could not find any > factory for identifier 'jdbc' that implements > 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. > Available factory identifiers are: > blackhole > datagen > filesystem > kafka > print > upsert-kafka > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) > at > org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463) > ... 34 more > > > SQL I was using - > _CREATE TABLE pvuv_sink (_ > _dt varchar PRIMARY KEY,_ > _pv BIGINT,_ > _uv BIGINT_ > _) WITH (_ > _'connector' = 'jdbc',_ > _'url' = 'jdbc:mysql://localhost:3306/flinksql_test',_ > _'table-name' = 'pvuv_sink',_ > _'username' = 'root',_ > _'password' = 'xx',_ > _'sink.buffer-flush.max-rows' = '1'_ > _);_ -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (FLINK-26447) Clean up webhook jar and dependency management
[ https://issues.apache.org/jira/browse/FLINK-26447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang reassigned FLINK-26447: - Assignee: Yang Wang > Clean up webhook jar and dependency management > -- > > Key: FLINK-26447 > URL: https://issues.apache.org/jira/browse/FLINK-26447 > Project: Flink > Issue Type: Sub-task > Components: Kubernetes Operator >Reporter: Gyula Fora >Assignee: Yang Wang >Priority: Major > > Currently the webhook module builds it's own shaded jar which includes the > operator shaded jar contents as well. > This is unnecessary and simply adds to the size of the image. > Operator dependencies should be in the provided scope and the operator shaded > jar simply put on the classpath when the webhook starts. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26456) StreamExecutionEnvironmentTests::test_add_python_file failed with ModuleNotFoundError
Huang Xingbo created FLINK-26456: Summary: StreamExecutionEnvironmentTests::test_add_python_file failed with ModuleNotFoundError Key: FLINK-26456 URL: https://issues.apache.org/jira/browse/FLINK-26456 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.14.0 Reporter: Huang Xingbo {code:java} 2022-03-02T16:33:43.6649755Z Mar 02 16:33:43 Traceback (most recent call last): 2022-03-02T16:33:43.6650839Z Mar 02 16:33:43 File "/__w/2/s/flink-python/.tox/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute 2022-03-02T16:33:43.6651405Z Mar 02 16:33:43 response = task() 2022-03-02T16:33:43.6652257Z Mar 02 16:33:43 File "/__w/2/s/flink-python/.tox/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in 2022-03-02T16:33:43.6652846Z Mar 02 16:33:43 lambda: self.create_worker().do_instruction(request), request) 2022-03-02T16:33:43.6653655Z Mar 02 16:33:43 File "/__w/2/s/flink-python/.tox/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction 2022-03-02T16:33:43.6654200Z Mar 02 16:33:43 return getattr(self, request_type)( 2022-03-02T16:33:43.6654969Z Mar 02 16:33:43 File "/__w/2/s/flink-python/.tox/py38/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle 2022-03-02T16:33:43.6655549Z Mar 02 16:33:43 bundle_processor.process_bundle(instruction_id)) 2022-03-02T16:33:43.6656337Z Mar 02 16:33:43 File "/__w/2/s/flink-python/.tox/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 999, in process_bundle 2022-03-02T16:33:43.6656936Z Mar 02 16:33:43 input_op_by_transform_id[element.transform_id].process_encoded( 2022-03-02T16:33:43.6657773Z Mar 02 16:33:43 File "/__w/2/s/flink-python/.tox/py38/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded 2022-03-02T16:33:43.6658326Z Mar 02 16:33:43 self.output(decoded_value) 2022-03-02T16:33:43.6658829Z Mar 02 16:33:43 File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output 2022-03-02T16:33:43.6659460Z Mar 02 16:33:43 File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output 2022-03-02T16:33:43.6660261Z Mar 02 16:33:43 File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive 2022-03-02T16:33:43.6661064Z Mar 02 16:33:43 File "apache_beam/runners/worker/operations.py", line 319, in apache_beam.runners.worker.operations.Operation.process 2022-03-02T16:33:43.6661902Z Mar 02 16:33:43 File "/__w/2/s/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py", line 132, in process 2022-03-02T16:33:43.6662440Z Mar 02 16:33:43 self._output_processor.process_outputs(o, self.process_element(value)) 2022-03-02T16:33:43.6663200Z Mar 02 16:33:43 File "/__w/2/s/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py", line 63, in process_outputs 2022-03-02T16:33:43.6663750Z Mar 02 16:33:43 self._consumer.process(windowed_value.with_value(results)) 2022-03-02T16:33:43.6664475Z Mar 02 16:33:43 File "/__w/2/s/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py", line 131, in process 2022-03-02T16:33:43.6664969Z Mar 02 16:33:43 for value in o.value: 2022-03-02T16:33:43.6665754Z Mar 02 16:33:43 File "/__w/2/s/flink-python/pyflink/fn_execution/datastream/operations.py", line 179, in wrapped_func 2022-03-02T16:33:43.280Z Mar 02 16:33:43 yield from _emit_results(timestamp, watermark, results) 2022-03-02T16:33:43.6667010Z Mar 02 16:33:43 File "/__w/2/s/flink-python/pyflink/fn_execution/datastream/input_handler.py", line 101, in _emit_results 2022-03-02T16:33:43.6667492Z Mar 02 16:33:43 for result in results: 2022-03-02T16:33:43.6668154Z Mar 02 16:33:43 File "/__w/2/s/flink-python/pyflink/datastream/data_stream.py", line 271, in process_element 2022-03-02T16:33:43.6668626Z Mar 02 16:33:43 yield self._map_func(value) 2022-03-02T16:33:43.6669335Z Mar 02 16:33:43 File "/__w/2/s/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py", line 350, in plus_two_map 2022-03-02T16:33:43.6669848Z Mar 02 16:33:43 from test_dep1 import add_two 2022-03-02T16:33:43.6670561Z Mar 02 16:33:43 ModuleNotFoundError: No module named 'test_dep1' {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32448=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [flink] KarmaGYZ commented on pull request #18959: [hotfix][runtime] Remove unused ResourceManagerServices
KarmaGYZ commented on pull request #18959: URL: https://github.com/apache/flink/pull/18959#issuecomment-1057583820 @xintongsong -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on pull request #18224: [FLINK-25143][test] Add ITCase for periodic materialization
rkhachatryan commented on pull request #18224: URL: https://github.com/apache/flink/pull/18224#issuecomment-1057580942 One more thing, can production (DFS-based) changelog implementation be used in this test? (`FsStateChangelogStorageFactory.configure`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #18224: [FLINK-25143][test] Add ITCase for periodic materialization
rkhachatryan commented on a change in pull request #18224: URL: https://github.com/apache/flink/pull/18224#discussion_r818237157 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase +extends ChangelogPeriodicMaterializationTestBase { + +public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { +super(delegatedStateBackend); +} + +@Before +public void setup() throws Exception { +super.setup(); +} Review comment: Is this override neccessary? ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase +extends ChangelogPeriodicMaterializationTestBase { + +public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { +super(delegatedStateBackend); +} + +@Before +public void setup() throws Exception { +super.setup(); +} + +/** Recovery from checkpoint only containing non-materialized state. */ +@Test +public void testNonMaterialization() throws Exception { +StreamExecutionEnvironment env = +getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)); +waitAndAssert( +buildJobGraph( +env, +new ControlledSource() { +@Override +protected void beforeElement(SourceContext ctx) +throws Exception { +if (getRuntimeContext().getAttemptNumber() == 0 +&& currentIndex == TOTAL_ELEMENTS / 2) { +
[GitHub] [flink] rkhachatryan commented on a change in pull request #18224: [FLINK-25143][test] Add ITCase for periodic materialization
rkhachatryan commented on a change in pull request #18224: URL: https://github.com/apache/flink/pull/18224#discussion_r818232919 ## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.io.File; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; + +import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint; +import static org.junit.Assert.assertTrue; + +/** + * This verifies that rescale works correctly for Changelog state backend with materialized state / + * non-materialized state. + */ +public class ChangelogPeriodicMaterializationRescaleITCase +extends ChangelogPeriodicMaterializationTestBase { + +public ChangelogPeriodicMaterializationRescaleITCase( +AbstractStateBackend delegatedStateBackend) { +super(delegatedStateBackend); +} + +@Test +public void testRescaleOut() throws Exception { +testRescale(NUM_SLOTS / 2, NUM_SLOTS); +} + +@Test +public void testRescaleIn() throws Exception { +testRescale(NUM_SLOTS, NUM_SLOTS / 2); +} + +private void testRescale(int firstParallelism, int secondParallelism) throws Exception { Review comment: Sure, I've found two issues: 1. https://issues.apache.org/jira/browse/FLINK-26455 which causes `CancellationException` 2. Unaligned checkpoints have some [limitations](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/unaligned_checkpoints/#certain-data-distribution-patterns-are-not-checkpointed) with rescaling which fails the recovery. I think we can simply disable UC for now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org