[jira] [Created] (FLINK-29728) TablePlanner prevents Flink from starting is working directory is a symbolic link
Angelo Kastroulis created FLINK-29728: - Summary: TablePlanner prevents Flink from starting is working directory is a symbolic link Key: FLINK-29728 URL: https://issues.apache.org/jira/browse/FLINK-29728 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.15.2 Reporter: Angelo Kastroulis The Flink runtime throws an exception when using the table API if the working directory is a symbolic link. This is the case when run on AWS EMR with Yarn. There is a similar issue [here|https://issues.apache.org/jira/browse/FLINK-20267] and I believe the same fix applied there would work. {code:java} Caused by: org.apache.flink.table.api.TableException: Could not initialize the table planner components loader. at org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:123) ~[flink-table-planner-loader-1.15.1.jar:1.15.1] at org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:52) ~[flink-table-planner-loader-1.15.1.jar:1.15.1] at org.apache.flink.table.planner.loader.PlannerModule$PlannerComponentsHolder.(PlannerModule.java:131) ~[flink-table-planner-loader-1.15.1.jar:1.15.1] at org.apache.flink.table.planner.loader.PlannerModule.getInstance(PlannerModule.java:135) ~[flink-table-planner-loader-1.15.1.jar:1.15.1] at org.apache.flink.table.planner.loader.DelegateExecutorFactory.(DelegateExecutorFactory.java:34) ~[flink-table-planner-loader-1.15.1.jar:1.15.1] at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_342] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_342] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_342] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_342] at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_342] at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) ~[?:1.8.0_342] at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) ~[?:1.8.0_342] at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ~[?:1.8.0_342] at org.apache.flink.table.factories.ServiceLoaderUtil.load(ServiceLoaderUtil.java:42) ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:798) ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:517) ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:276) ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:93) ~[flink-table-api-java-uber-1.15.1.jar:1.15.1] at com.ballista.Hermes.BCSE$.useLocalCatalog(BCSE.scala:210) ~[?:?] at com.ballista.Hermes.BCSE$.main(BCSE.scala:114) ~[?:?] at com.ballista.Hermes.BCSE.main(BCSE.scala) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_342] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_342] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_342] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.15.1.jar:1.15.1] ... 7 more Caused by: java.nio.file.FileAlreadyExistsException: /tmp at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) ~[?:1.8.0_342] at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) ~[?:1.8.0_342] at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) ~[?:1.8.0_342] at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384) ~[?:1.8.0_342] at java.nio.file.Files.createDirectory(Files.java:674) ~[?:1.8.0_342] at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) ~[?:1.8.0_342] at java.nio.file.Files.createDirectories(Files.java:727) ~[?:1.8.0_342] at org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:96) ~[flink-table-planner-loader-1.15.1.jar:1.15.1] at org.apache.flink.table.planner.loader.PlannerModule.(PlannerModule.java:52) ~[flink-table-planner-loader-1.15.1.jar:1.15.1] at org.apache.flink.table.planner.loader.PlannerModule$PlannerComponentsHolder.(PlannerModule.java:131) ~[flink-table-planner-loader-1.15.1.jar:1.15.1] at org.apache.flink.table.planner.loader.PlannerModule.getInstance(PlannerModule.java:135)
[jira] [Commented] (FLINK-29398) Utilize Rack Awareness in Flink Consumer
[ https://issues.apache.org/jira/browse/FLINK-29398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622444#comment-17622444 ] Jeremy DeGroot commented on FLINK-29398: Here's the FLIP page I made for this https://cwiki.apache.org/confluence/display/FLINK/FLIP-268%3A+Kafka+Rack+Awareness > Utilize Rack Awareness in Flink Consumer > > > Key: FLINK-29398 > URL: https://issues.apache.org/jira/browse/FLINK-29398 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Jeremy DeGroot >Assignee: Jeremy DeGroot >Priority: Major > > [KIP-708|https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams] > was implemented some time ago in Kafka. This allows brokers and consumers to > communicate about the rack (or AWS Availability Zone) they're located in. > Reading from a local broker can save money in bandwidth and improve latency > for your consumers. > Flink Kafka consumers currently cannot easily use rack awareness if they're > deployed across multiple racks or availability zones, because they have no > control over which rack the Task Manager they'll be assigned to may be in. > This improvement proposes that a Kafka Consumer could be configured with a > callback or Future that could be run when it's being configured on the task > manager, that will set the appropriate value at runtime if a value is > provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29713) Kubernetes operator should restart failed jobs
[ https://issues.apache.org/jira/browse/FLINK-29713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622391#comment-17622391 ] Danny Cranmer commented on FLINK-29713: --- Did you try using the job restart strategy? [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#restart-strategy-type] Does not cover the reconfiguration but will restart failed jobs > Kubernetes operator should restart failed jobs > -- > > Key: FLINK-29713 > URL: https://issues.apache.org/jira/browse/FLINK-29713 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Peter Vary >Assignee: Peter Vary >Priority: Major > Fix For: kubernetes-operator-1.3.0 > > > It would be good to have the possibility to restart the Flink Application if > it goes to {{FAILED}} state. > This could be used to restart, and reconfigure the job dynamically in the > application {{main}} method if the current application can not handle the > incoming data -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin commented on pull request #20772: [FLINK-29214][Table API/SQL] Remove usages of deprecated Aggregate#indicator
snuyanzin commented on PR #20772: URL: https://github.com/apache/flink/pull/20772#issuecomment-1287268456 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29398) Utilize Rack Awareness in Flink Consumer
[ https://issues.apache.org/jira/browse/FLINK-29398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622382#comment-17622382 ] Jeremy DeGroot commented on FLINK-29398: [~martijnvisser] If I'm reading KIP-392 correctly, that's been implemented to take advantage of metadata like rack awareness. By implementing this, we'd get the benefit of KIP-392 (at least as it relates to the rack ID). > Utilize Rack Awareness in Flink Consumer > > > Key: FLINK-29398 > URL: https://issues.apache.org/jira/browse/FLINK-29398 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Jeremy DeGroot >Assignee: Jeremy DeGroot >Priority: Major > > [KIP-708|https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams] > was implemented some time ago in Kafka. This allows brokers and consumers to > communicate about the rack (or AWS Availability Zone) they're located in. > Reading from a local broker can save money in bandwidth and improve latency > for your consumers. > Flink Kafka consumers currently cannot easily use rack awareness if they're > deployed across multiple racks or availability zones, because they have no > control over which rack the Task Manager they'll be assigned to may be in. > This improvement proposes that a Kafka Consumer could be configured with a > callback or Future that could be run when it's being configured on the task > manager, that will set the appropriate value at runtime if a value is > provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] snuyanzin commented on pull request #20773: [FLINK-29215][Table SQL/API] Use config based constructors for converter rules instead of deprecated
snuyanzin commented on PR #20773: URL: https://github.com/apache/flink/pull/20773#issuecomment-1287219472 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RyanSkraba commented on a diff in pull request #20258: [FLINK-28522][tests][JUnit5 migration] flink-sequence-file
RyanSkraba commented on code in PR #20258: URL: https://github.com/apache/flink/pull/20258#discussion_r1001994132 ## flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java: ## @@ -19,70 +19,51 @@ package org.apache.flink.formats.sequencefile; import org.apache.hadoop.conf.Configuration; -import org.hamcrest.Description; -import org.hamcrest.TypeSafeMatcher; -import org.junit.Before; -import org.junit.Test; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.HamcrestCondition.matching; - /** Tests for the {@link SerializableHadoopConfiguration}. */ -public class SerializableHadoopConfigurationTest { +class SerializableHadoopConfigurationTest { private static final String TEST_KEY = "test-key"; private static final String TEST_VALUE = "test-value"; private Configuration configuration; -@Before -public void createConfigWithCustomProperty() { +@BeforeEach +void createConfigWithCustomProperty() { this.configuration = new Configuration(); configuration.set(TEST_KEY, TEST_VALUE); } @Test -public void customPropertiesSurviveSerializationDeserialization() +void customPropertiesSurviveSerializationDeserialization() throws IOException, ClassNotFoundException { final SerializableHadoopConfiguration serializableConfigUnderTest = new SerializableHadoopConfiguration(configuration); final byte[] serializedConfigUnderTest = serializeAndGetBytes(serializableConfigUnderTest); final SerializableHadoopConfiguration deserializableConfigUnderTest = deserializeAndGetConfiguration(serializedConfigUnderTest); -assertThat(deserializableConfigUnderTest.get()) -.satisfies(matching(hasTheSamePropertiesAs(configuration))); -} - -// Matchers // - -private static TypeSafeMatcher hasTheSamePropertiesAs( -final Configuration expectedConfig) { -return new TypeSafeMatcher() { -@Override -protected boolean matchesSafely(Configuration actualConfig) { -final String value = actualConfig.get(TEST_KEY); -return actualConfig != expectedConfig -&& value != null -&& expectedConfig.get(TEST_KEY).equals(value); -} - -@Override -public void describeTo(Description description) { -description -.appendText("a Hadoop Configuration with property: key=") -.appendValue(TEST_KEY) -.appendText(" and value=") -.appendValue(TEST_VALUE); -} -}; + Assertions.assertThat(deserializableConfigUnderTest.get()) +.matches( +actualConfig -> { +final String value = actualConfig.get(TEST_KEY); +return actualConfig != configuration Review Comment: This needed to be rewritten in order to compile. I squashed it into the single commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ferenc-csaky commented on pull request #21127: [FLINK-29707][cli] Fix possible comparator violation for "flink list"
ferenc-csaky commented on PR #21127: URL: https://github.com/apache/flink/pull/21127#issuecomment-1287199318 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] RyanSkraba commented on pull request #19897: [FLINK-27885][tests][JUnit5 migration] flink-csv
RyanSkraba commented on PR #19897: URL: https://github.com/apache/flink/pull/19897#issuecomment-1287185674 Thanks for the review -- I've addressed the comments, except for `org.apache.flink.formats.csv.CsvFilesystemBatchITCase` which is based on a hierarchy of tests across modules that need to be migrated together. I typically leave these to be migrated at the same time later (and in this case will likely be a big undertaking as there are at least 76 classes in the hierarchy). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25993) Option to disable Kryo.setRegistrationRequired(false)
[ https://issues.apache.org/jira/browse/FLINK-25993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622344#comment-17622344 ] Galen Warren commented on FLINK-25993: -- [~chesnay] [~diagonalbishop] I found your old discussion thread on this topic: [https://lists.apache.org/thread/0bzro3qmmsqrz5kh7lmx3xwfyj8wl5rk] I'm also interested in this feature, and I have it working in a local fork. Would a PR be welcome here? Thanks. > Option to disable Kryo.setRegistrationRequired(false) > - > > Key: FLINK-25993 > URL: https://issues.apache.org/jira/browse/FLINK-25993 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.14.3 >Reporter: Shane Bishop >Priority: Minor > > I would like to request a mechanism that a Flink library user could use to > optionally disable Kryo.setRegistrationRequired(false). > The motivation is that Kyro.setRegistrationRequired(true) was made the safe > default in [this > commit|https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00] > (specifically the change was [this > line|https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130] > in the commit). This default is overriden in the 1.14.3 Flink release (see > [KryoSerializer.java|https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492] > and > [FlinkScalaKryoInstantiator.scala|https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46]). > Reference to thread in mailing list: > https://lists.apache.org/thread/0bzro3qmmsqrz5kh7lmx3xwfyj8wl5rk -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29496) Add Configuration for STS endpoint when using ASSUME_ROLE credential provider
[ https://issues.apache.org/jira/browse/FLINK-29496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-29496: -- Summary: Add Configuration for STS endpoint when using ASSUME_ROLE credential provider (was: Unable to configure STS endpoint when using ASSUME_ROLE credential provider in Kinesis connector) > Add Configuration for STS endpoint when using ASSUME_ROLE credential provider > - > > Key: FLINK-29496 > URL: https://issues.apache.org/jira/browse/FLINK-29496 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Affects Versions: 1.16.0, 1.15.2 >Reporter: Aleksandr Pilipenko >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > When using Kinesis connector with credentials provider configured as > ASSUME_ROLE in the job running in VPC without internet connection, > credentials provider logic tries to access global STS endpoint, > {{{}sts.amazonaws.com{}}}. However, only regional endpoints for STS are > available in that case. > Connector need support for configuring STS endpoint to allow such use-case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29496) Unable to configure STS endpoint when using ASSUME_ROLE credential provider in Kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-29496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer updated FLINK-29496: -- Fix Version/s: (was: 1.15.3) (was: 1.16.1) > Unable to configure STS endpoint when using ASSUME_ROLE credential provider > in Kinesis connector > > > Key: FLINK-29496 > URL: https://issues.apache.org/jira/browse/FLINK-29496 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Affects Versions: 1.16.0, 1.15.2 >Reporter: Aleksandr Pilipenko >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > When using Kinesis connector with credentials provider configured as > ASSUME_ROLE in the job running in VPC without internet connection, > credentials provider logic tries to access global STS endpoint, > {{{}sts.amazonaws.com{}}}. However, only regional endpoints for STS are > available in that case. > Connector need support for configuring STS endpoint to allow such use-case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] RyanSkraba commented on pull request #20805: [FLINK-29198][test] Fail after maximum RetryOnException
RyanSkraba commented on PR #20805: URL: https://github.com/apache/flink/pull/20805#issuecomment-1287136423 Rebased again and moved some modifications to their own separate commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-29696) [Doc] Operator helm install command points to wrong repo
[ https://issues.apache.org/jira/browse/FLINK-29696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi closed FLINK-29696. -- Resolution: Not A Problem This is intentional. One references the released helm chart, the other the local dev one. > [Doc] Operator helm install command points to wrong repo > > > Key: FLINK-29696 > URL: https://issues.apache.org/jira/browse/FLINK-29696 > Project: Flink > Issue Type: Improvement > Components: Documentation, Kubernetes Operator >Reporter: Yufei Zhang >Priority: Minor > > In the operator documentation, the repo is added via: > `helm repo add flink-operator-repo > https://downloads.apache.org/flink/flink-kubernetes-operator-/` > > But later in the Operation-> Helm, the code instruct us to use > > `{{{}helm install flink-kubernetes-operator > helm/flink-kubernetes-operator`{}}} > {{}} > Here we won't be able to download the helm chart since we are not using the > right repo. > > You can assign this Jira to me and I can submit a PR to fix it~ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] RyanSkraba commented on a diff in pull request #20805: [FLINK-29198][test] Fail after maximum RetryOnException
RyanSkraba commented on code in PR #20805: URL: https://github.com/apache/flink/pull/20805#discussion_r1001917794 ## flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureTest.java: ## @@ -18,17 +18,16 @@ package org.apache.flink.testutils.junit; -import org.apache.flink.testutils.junit.extensions.retry.RetryExtension; - -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for the RetryOnFailure annotation. */ -@ExtendWith(RetryExtension.class) -class RetryOnFailureTest { +/** Tests for the {@link RetryOnFailure} annotation on JUnit4 {@link RetryRule}. */ +public class RetryOnFailureTest { Review Comment: No problem -- you've been very reactive, thanks! I assume you mean that you want all of the changes back to JUnit4 to be in a separate commit. I'll prepare that for RetryOnExceptionTest as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-dynamodb] dannycranmer commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector
dannycranmer commented on code in PR #1: URL: https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1001914891 ## flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbWriteRequestType.java: ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Represents a DynamoDb Write Request type. The following types are currently supported + * + * + * PUT - Put Request + * DELETE - Delete Request + * + */ +@PublicEvolving +public enum DynamoDbWriteRequestType { +PUT, +DELETE, +} Review Comment: Is it worth trying use a more general Flink type to represent this, something like https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/RowKind.java? Maybe @MartijnVisser would be able to help? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-dynamodb] dannycranmer commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector
dannycranmer commented on code in PR #1: URL: https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1001870394 ## flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkBuilder.java: ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +/** + * Builder to construct {@link DynamoDbSink}. + * + * The following example shows the minimum setup to create a {@link DynamoDbSink} that writes + * records into DynamoDb + * + * {@code + * private static class DummyDynamoDbElementConverter implements ElementConverter { + * + * @Override + * public DynamoDbWriteRequest apply(String s) { + * final Map item = new HashMap<>(); + * item.put("your-key", AttributeValue.builder().s(s).build()); + * return new DynamoDbWriteRequest( + * WriteRequest.builder() + * .putRequest(PutRequest.builder() + * .item(item) + * .build()) + * .build() + * ); + * } + * } + * DynamoDbSink dynamoDbSink = DynamoDbSink.builder() + * .setElementConverter(new DummyDynamoDbElementConverter()) + * .setDestinationTableName("your-table-name") + * .build(); + * } + * + * If the following parameters are not set in this builder, the following defaults will be used: + * + * + * {@code maxBatchSize} will be 25 + * {@code maxInFlightRequests} will be 50 + * {@code maxBufferedRequests} will be 1 + * {@code maxBatchSizeInBytes} will be 16 MB i.e. {@code 16 * 1000 * 1000} + * {@code maxTimeInBufferMS} will be 5000ms + * {@code maxRecordSizeInBytes} will be 400 KB i.e. {@code 400 * 1000} + * {@code failOnError} will be false + * {@code destinationTableName} destination table for the sink + * {@code overwriteByPKeys} will be empty meaning no records deduplication will be performed Review Comment: This comment is outdated ## flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkBuilder.java: ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +/** + * Builder to construct {@link DynamoDbSink}. + * + * The following example shows the minimum setup to create a {@link DynamoDbSink} that writes + * records into DynamoDb + * + * {@code + * private static class DummyDynamoDbElementConverter implements ElementConverter { + * + * @Override + * public
[jira] [Commented] (FLINK-24119) KafkaITCase.testTimestamps fails due to "Topic xxx already exist"
[ https://issues.apache.org/jira/browse/FLINK-24119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622318#comment-17622318 ] Matthias Pohl commented on FLINK-24119: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42232=logs=d22373ad-b356-55ba-ef18-6ae7deba4552=7d4e458d-e0e0-5f89-c72d-7371ef61b09b=350c6121-8698-59d6-9a85-c8cf427aed84=37447 > KafkaITCase.testTimestamps fails due to "Topic xxx already exist" > - > > Key: FLINK-24119 > URL: https://issues.apache.org/jira/browse/FLINK-24119 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0, 1.15.0, 1.16.0 >Reporter: Xintong Song >Assignee: Qingsheng Ren >Priority: Critical > Labels: auto-deprioritized-critical, test-stability > Fix For: 1.16.1 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23328=logs=c5f0071e-1851-543e-9a45-9ac140befc32=15a22db7-8faa-5b34-3920-d33c9f0ca23c=7419 > {code} > Sep 01 15:53:20 [ERROR] Tests run: 23, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 162.65 s <<< FAILURE! - in > org.apache.flink.streaming.connectors.kafka.KafkaITCase > Sep 01 15:53:20 [ERROR] testTimestamps Time elapsed: 23.237 s <<< FAILURE! > Sep 01 15:53:20 java.lang.AssertionError: Create test topic : tstopic failed, > org.apache.kafka.common.errors.TopicExistsException: Topic 'tstopic' already > exists. > Sep 01 15:53:20 at org.junit.Assert.fail(Assert.java:89) > Sep 01 15:53:20 at > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:226) > Sep 01 15:53:20 at > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:112) > Sep 01 15:53:20 at > org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:212) > Sep 01 15:53:20 at > org.apache.flink.streaming.connectors.kafka.KafkaITCase.testTimestamps(KafkaITCase.java:191) > Sep 01 15:53:20 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Sep 01 15:53:20 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Sep 01 15:53:20 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Sep 01 15:53:20 at java.lang.reflect.Method.invoke(Method.java:498) > Sep 01 15:53:20 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Sep 01 15:53:20 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Sep 01 15:53:20 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Sep 01 15:53:20 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Sep 01 15:53:20 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > Sep 01 15:53:20 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > Sep 01 15:53:20 at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > Sep 01 15:53:20 at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery
[ https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622315#comment-17622315 ] Yordan Pavlov commented on FLINK-16419: --- Again, thanks for you input [~martijnvisser] > 1. When you're executing a stop-with-savepoint, there is a guarantee that > transactions are committed. Thanks for verifying, those are also our observations. However, once the job is stopped it would then need to re-fetch the whole savepoint again to resume, this can be very slow in our setup and cause huge latency on the normal flow of data processing. Is there a way how we can stop (close all transactions that is) and start the job without loosing the state? > 2. For regular savepoints, there is a very small chance that > notifyCheckpointCompleted() has been lost. Virtually nil, but theoretically > possible. What you are describing sounds like some rare race condition event, this does not match our observation. On recovery from many of the savepoints, Flink would try to recover a previous transaction producer and end with the error: {code:java} The producer attempted to use a producer id which is not currently assigned to its transactional id.{code} In the previous discussions on this ticket I was left with the impression that this is expected behavior and not a bug due to notifyCheckpointCompleted() being lost. If we establish that this is behavior is not expected I can come back with a sequence of events that reproduce the problem. > 3. If after a savepoint there was at least one following successfull > checkpoint, then the chances that same subtask lost two > notifyCheckpointCompleted() calls in a row is nil^2 In our case there are thousands of checkpoints (and savepoints) after the savepoint we recover from. And all those transactions are closed as we are seeing the data in Kafka, reading only commited. Still on recovery from the old savepoint some re-play is taking place which causes the problem. > Avoid to recommit transactions which are known committed successfully to > Kafka upon recovery > > > Key: FLINK-16419 > URL: https://issues.apache.org/jira/browse/FLINK-16419 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Runtime / Checkpointing >Reporter: Jun Qin >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > usability > > When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer > tries to recommit all pre-committed transactions which are in the snapshot, > even if those transactions were successfully committed before (i.e., the call > to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} > returns OK). This may lead to recovery failures when recovering from a very > old snapshot because the transactional IDs in that snapshot may have been > expired and removed from Kafka. For example the following scenario: > # Start a Flink job with FlinkKafkaProducer sink with exactly-once > # Suspend the Flink job with a savepoint A > # Wait for time longer than {{transactional.id.expiration.ms}} + > {{transaction.remove.expired.transaction.cleanup.interval.ms}} > # Recover the job with savepoint A. > # The recovery will fail with the following error: > {noformat} > 2020-02-26 14:33:25,817 INFO > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer > - Attempting to resume transaction Source: Custom Source -> Sink: > Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch > 1202020-02-26 14:33:25,914 INFO org.apache.kafka.clients.Metadata > - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA > 2020-02-26 14:33:26,017 INFO org.apache.kafka.clients.producer.KafkaProducer > - [Producer clientId=producer-1, transactionalId=Source: Custom > Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka > producer with timeoutMillis = 92233720 > 36854775807 ms. > 2020-02-26 14:33:26,019 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Sink: Unnamed (1/1) > (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED. > org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: > The producer attempted to use a producer id which is not currently assigned > to its transactional id. > at > org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909) > at >
[GitHub] [flink] XComp commented on a diff in pull request #20805: [FLINK-29198][test] Fail after maximum RetryOnException
XComp commented on code in PR #20805: URL: https://github.com/apache/flink/pull/20805#discussion_r1001878855 ## flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureTest.java: ## @@ -18,17 +18,16 @@ package org.apache.flink.testutils.junit; -import org.apache.flink.testutils.junit.extensions.retry.RetryExtension; - -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for the RetryOnFailure annotation. */ -@ExtendWith(RetryExtension.class) -class RetryOnFailureTest { +/** Tests for the {@link RetryOnFailure} annotation on JUnit4 {@link RetryRule}. */ +public class RetryOnFailureTest { Review Comment: @RyanSkraba sorry for reiterating over the PR again but could you also move this change out into its own commit? Initially, that's what I wanted to do myself. But I realized that I probably would overwrite your authorship. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29716) Separate slf4j jar in the lib folder from the distribution
[ https://issues.apache.org/jira/browse/FLINK-29716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622313#comment-17622313 ] Chesnay Schepler commented on FLINK-29716: -- The log4j jars are bundled separately to allow users to switch logging backends or even go back to log4j1. There's no such requirement for slf4j. Should we upgrade to slf4j 2.x then that is just what Flink will require like any other direct dependency. Given that it is a compile dependency (unlike log4j) replacing it isn't as trivially safe as log4j is; for example if we were to start using the new fluent logging API then replacing it with slf4j v1 is just not an option. > Separate slf4j jar in the lib folder from the distribution > -- > > Key: FLINK-29716 > URL: https://issues.apache.org/jira/browse/FLINK-29716 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.15.2 >Reporter: Alexis Sarda-Espinosa >Priority: Major > > Flink's binary distribution includes several jars under the {{lib}} folder, > which has individual jars for all log4j artifacts. This makes it relatively > easy to swap out those logging jars when necessary, for example when critical > vulnerabilities are found (as was recently the case). > With SLF4J 2.+, some breaking changes mean that many implementations are not > directly backwards compatible, see for example the [notes for > log4j2|https://logging.apache.org/log4j/2.x/log4j-slf4j-impl/index.html]. > This means that, in the future, if swapping logging jars were necessary, the > SLF4J jar might have to be changed as well. > Right now the SLF4J jar is not included separately in the distribution, I > believe it's packed inside the {{flink-dist}} jar, although I'm not sure. It > would be better to separate that as it is done for the default log4j2 jars. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29694) Support tolerations in helm template for flink operator deployment
[ https://issues.apache.org/jira/browse/FLINK-29694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-29694. -- Fix Version/s: kubernetes-operator-1.3.0 Resolution: Fixed merged to main f2c34abc466c9e33745a9b2b62e4ac2ad640ef39 > Support tolerations in helm template for flink operator deployment > -- > > Key: FLINK-29694 > URL: https://issues.apache.org/jira/browse/FLINK-29694 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Max Lim >Assignee: Max Lim >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.3.0 > > > The Operator's deployment should allow specifying tolerations. There are > cases where we want the operator to not be scheduled onto inappropriate > nodes. In such cases, it will be great if we can support this via the Helm > Chart. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #405: [FLINK-29694] Support tolerations in helm template for flink operator deployment
gyfora merged PR #405: URL: https://github.com/apache/flink-kubernetes-operator/pull/405 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-29715) Expose max_parallelism in JSON plan
[ https://issues.apache.org/jira/browse/FLINK-29715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-29715. -- Resolution: Won't Fix Turns out this is already part of the JobDetailsInfo > Expose max_parallelism in JSON plan > --- > > Key: FLINK-29715 > URL: https://issues.apache.org/jira/browse/FLINK-29715 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Minor > > The JobGraph json plan currently only contains vertex parallelism but not the > max_parallelism. This could be very useful information to also show on the UI > for debugging data skew/performance issues or for any tooling that relies on > the jobgraph information gathered from the rest endpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29727) Fall back to flink-conf.yaml if no JOB_MANAGER_RPC_ADDRESS is specified
[ https://issues.apache.org/jira/browse/FLINK-29727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Fedulov updated FLINK-29727: -- Description: Currently {{docker-enterypoint.sh}} always overrides {{jobmanager.rpc.address}} in {{flink-conf.yaml}} either with an environment variable {{JOB_MANAGER_RPC_ADDRESS}} or with a {{hosname}} : [link|https://github.com/apache/flink-docker/blob/3c259f46231b97202925a111a8205193c15bbf78/1.15/scala_2.12-java8-ubuntu/docker-entrypoint.sh#L25] . This causes, for instance, jobmanager address configured in {{FlinkContainers}} that are based on an existing image (contains {{docker-entrypoint.sh}} in contrast to an image that is built on the flight from {{{}flink-dist{}}}) to be overridden by the hostname. TMs then fail for connect to the JM. A workaround is to use {{TestContainersSettings}} to set {{{}JOB_MANAGER_RPC_ADDRESS{}}}, which is suboptimal from the user's perspective. Configuration in flink-conf.yaml should instead be kept if JOB_MANAGER_RPC_ADDRESS is not passed explicitly and only overridden by the {{hostname}} if nothing was specified in the config. was: Currently {{docker-enterypoint.sh}} always overrides {{jobmanager.rpc.address}} in {{flink-conf.yaml}} either with an environment variable {{JOB_MANAGER_RPC_ADDRESS}} or with a {{hosname}} : [link|https://github.com/apache/flink-docker/blob/3c259f46231b97202925a111a8205193c15bbf78/1.15/scala_2.12-java8-ubuntu/docker-entrypoint.sh#L25] . This causes, for instance, jobmanager address configured in `FlinkContainers` that are based on an existing image (contains `docker-entrypoint.sh` in contrast to an image that is built on-the flight from flink-dist) to be overridden by the hostname. TMs then fail for connect to the JM. A workaround is to use {{TestContainersSettings}} to set {{JOB_MANAGER_RPC_ADDRESS }}, which is a suboptimal from the user perspective. Configuration in flink-conf.yaml should instead be kept if JOB_MANAGER_RPC_ADDRESS is not passed explicitly and only overridden by the {{hostname}} if nothing was specified in the confit. > Fall back to flink-conf.yaml if no JOB_MANAGER_RPC_ADDRESS is specified > --- > > Key: FLINK-29727 > URL: https://issues.apache.org/jira/browse/FLINK-29727 > Project: Flink > Issue Type: Improvement > Components: flink-docker >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > > Currently {{docker-enterypoint.sh}} always overrides > {{jobmanager.rpc.address}} in {{flink-conf.yaml}} either with an environment > variable {{JOB_MANAGER_RPC_ADDRESS}} or with a {{hosname}} : > [link|https://github.com/apache/flink-docker/blob/3c259f46231b97202925a111a8205193c15bbf78/1.15/scala_2.12-java8-ubuntu/docker-entrypoint.sh#L25] > . This causes, for instance, jobmanager address configured in > {{FlinkContainers}} that are based on an existing image (contains > {{docker-entrypoint.sh}} in contrast to an image that is built on the flight > from {{{}flink-dist{}}}) to be overridden by the hostname. TMs then fail for > connect to the JM. A workaround is to use {{TestContainersSettings}} to set > {{{}JOB_MANAGER_RPC_ADDRESS{}}}, which is suboptimal from the user's > perspective. > Configuration in flink-conf.yaml should instead be kept if > JOB_MANAGER_RPC_ADDRESS is not passed explicitly and only overridden by the > {{hostname}} if nothing was specified in the config. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29727) Fall back to flink-conf.yaml if no JOB_MANAGER_RPC_ADDRESS is specified
[ https://issues.apache.org/jira/browse/FLINK-29727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Fedulov updated FLINK-29727: -- Description: Currently {{docker-enterypoint.sh}} always overrides {{jobmanager.rpc.address}} in {{flink-conf.yaml}} either with the environment variable {{JOB_MANAGER_RPC_ADDRESS}} or with a {{hosname}} : [link|https://github.com/apache/flink-docker/blob/3c259f46231b97202925a111a8205193c15bbf78/1.15/scala_2.12-java8-ubuntu/docker-entrypoint.sh#L25] . This causes, for instance, jobmanager address configured in `FlinkContainers` that are based on an existing image (contains `docker-entrypoint.sh` in contrast to an image that is built on-the flight from flink-dist) to be overridden by the hostname. TMs then fail for connect to the JM. A workaround is to use {{TestContainersSettings}} to set {{JOB_MANAGER_RPC_ADDRESS }}, which is a suboptimal from the user perspective. Configuration in flink-conf.yaml should instead be kept if JOB_MANAGER_RPC_ADDRESS is not passed explicitly and only overridden by the {{hostname}} if nothing was specified in the confit. was: Currently {{docker-enterypoint.sh}} always overrides {{jobmanager.rpc.address}} in the {{flink-conf.yaml}} either with the environment variable {{JOB_MANAGER_RPC_ADDRESS}} or with a {{hosname}} : [link|https://github.com/apache/flink-docker/blob/3c259f46231b97202925a111a8205193c15bbf78/1.15/scala_2.12-java8-ubuntu/docker-entrypoint.sh#L25] . This causes, for instance, jobmanager address configured in `FlinkContainers` that are based on an existing image (contains `docker-entrypoint.sh` in contrast to an image that is built on-the flight from flink-dist) to be overridden by the hostname. TMs then fail for connect to the JM. A workaround is to use {{TestContainersSettings}} to set {{JOB_MANAGER_RPC_ADDRESS }}, which is a suboptimal from the user perspective. Configuration in flink-conf.yaml should instead be kept if JOB_MANAGER_RPC_ADDRESS is not passed explicitly and only overridden by the {{hostname}} if nothing was specified in the confit. > Fall back to flink-conf.yaml if no JOB_MANAGER_RPC_ADDRESS is specified > --- > > Key: FLINK-29727 > URL: https://issues.apache.org/jira/browse/FLINK-29727 > Project: Flink > Issue Type: Improvement > Components: flink-docker >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > > Currently {{docker-enterypoint.sh}} always overrides > {{jobmanager.rpc.address}} in {{flink-conf.yaml}} either with the environment > variable {{JOB_MANAGER_RPC_ADDRESS}} or with a {{hosname}} : > [link|https://github.com/apache/flink-docker/blob/3c259f46231b97202925a111a8205193c15bbf78/1.15/scala_2.12-java8-ubuntu/docker-entrypoint.sh#L25] > . This causes, for instance, jobmanager address configured in > `FlinkContainers` that are based on an existing image (contains > `docker-entrypoint.sh` in contrast to an image that is built on-the flight > from flink-dist) to be overridden by the hostname. TMs then fail for connect > to the JM. A workaround is to use {{TestContainersSettings}} to set > {{JOB_MANAGER_RPC_ADDRESS }}, which is a suboptimal from the user perspective. > Configuration in flink-conf.yaml should instead be kept if > JOB_MANAGER_RPC_ADDRESS is not passed explicitly and only overridden by the > {{hostname}} if nothing was specified in the confit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29727) Fall back to flink-conf.yaml if no JOB_MANAGER_RPC_ADDRESS is specified
[ https://issues.apache.org/jira/browse/FLINK-29727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Fedulov updated FLINK-29727: -- Description: Currently {{docker-enterypoint.sh}} always overrides {{jobmanager.rpc.address}} in {{flink-conf.yaml}} either with an environment variable {{JOB_MANAGER_RPC_ADDRESS}} or with a {{hosname}} : [link|https://github.com/apache/flink-docker/blob/3c259f46231b97202925a111a8205193c15bbf78/1.15/scala_2.12-java8-ubuntu/docker-entrypoint.sh#L25] . This causes, for instance, jobmanager address configured in `FlinkContainers` that are based on an existing image (contains `docker-entrypoint.sh` in contrast to an image that is built on-the flight from flink-dist) to be overridden by the hostname. TMs then fail for connect to the JM. A workaround is to use {{TestContainersSettings}} to set {{JOB_MANAGER_RPC_ADDRESS }}, which is a suboptimal from the user perspective. Configuration in flink-conf.yaml should instead be kept if JOB_MANAGER_RPC_ADDRESS is not passed explicitly and only overridden by the {{hostname}} if nothing was specified in the confit. was: Currently {{docker-enterypoint.sh}} always overrides {{jobmanager.rpc.address}} in {{flink-conf.yaml}} either with the environment variable {{JOB_MANAGER_RPC_ADDRESS}} or with a {{hosname}} : [link|https://github.com/apache/flink-docker/blob/3c259f46231b97202925a111a8205193c15bbf78/1.15/scala_2.12-java8-ubuntu/docker-entrypoint.sh#L25] . This causes, for instance, jobmanager address configured in `FlinkContainers` that are based on an existing image (contains `docker-entrypoint.sh` in contrast to an image that is built on-the flight from flink-dist) to be overridden by the hostname. TMs then fail for connect to the JM. A workaround is to use {{TestContainersSettings}} to set {{JOB_MANAGER_RPC_ADDRESS }}, which is a suboptimal from the user perspective. Configuration in flink-conf.yaml should instead be kept if JOB_MANAGER_RPC_ADDRESS is not passed explicitly and only overridden by the {{hostname}} if nothing was specified in the confit. > Fall back to flink-conf.yaml if no JOB_MANAGER_RPC_ADDRESS is specified > --- > > Key: FLINK-29727 > URL: https://issues.apache.org/jira/browse/FLINK-29727 > Project: Flink > Issue Type: Improvement > Components: flink-docker >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > > Currently {{docker-enterypoint.sh}} always overrides > {{jobmanager.rpc.address}} in {{flink-conf.yaml}} either with an environment > variable {{JOB_MANAGER_RPC_ADDRESS}} or with a {{hosname}} : > [link|https://github.com/apache/flink-docker/blob/3c259f46231b97202925a111a8205193c15bbf78/1.15/scala_2.12-java8-ubuntu/docker-entrypoint.sh#L25] > . This causes, for instance, jobmanager address configured in > `FlinkContainers` that are based on an existing image (contains > `docker-entrypoint.sh` in contrast to an image that is built on-the flight > from flink-dist) to be overridden by the hostname. TMs then fail for connect > to the JM. A workaround is to use {{TestContainersSettings}} to set > {{JOB_MANAGER_RPC_ADDRESS }}, which is a suboptimal from the user perspective. > Configuration in flink-conf.yaml should instead be kept if > JOB_MANAGER_RPC_ADDRESS is not passed explicitly and only overridden by the > {{hostname}} if nothing was specified in the confit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29727) Fall back to flink-conf.yaml if no JOB_MANAGER_RPC_ADDRESS is specified
Alexander Fedulov created FLINK-29727: - Summary: Fall back to flink-conf.yaml if no JOB_MANAGER_RPC_ADDRESS is specified Key: FLINK-29727 URL: https://issues.apache.org/jira/browse/FLINK-29727 Project: Flink Issue Type: Improvement Components: flink-docker Reporter: Alexander Fedulov Assignee: Alexander Fedulov Currently {{docker-enterypoint.sh}} always overrides {{jobmanager.rpc.address}} in the {{flink-conf.yaml}} either with the environment variable {{JOB_MANAGER_RPC_ADDRESS}} or with a {{hosname}} : [link|https://github.com/apache/flink-docker/blob/3c259f46231b97202925a111a8205193c15bbf78/1.15/scala_2.12-java8-ubuntu/docker-entrypoint.sh#L25] . This causes, for instance, jobmanager address configured in `FlinkContainers` that are based on an existing image (contains `docker-entrypoint.sh` in contrast to an image that is built on-the flight from flink-dist) to be overridden by the hostname. TMs then fail for connect to the JM. A workaround is to use {{TestContainersSettings}} to set {{JOB_MANAGER_RPC_ADDRESS }}, which is a suboptimal from the user perspective. Configuration in flink-conf.yaml should instead be kept if JOB_MANAGER_RPC_ADDRESS is not passed explicitly and only overridden by the {{hostname}} if nothing was specified in the confit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-web] Myasuka commented on a diff in pull request #574: Announcement blogpost for the 1.16 release
Myasuka commented on code in PR #574: URL: https://github.com/apache/flink-web/pull/574#discussion_r1001703393 ## _posts/2022-10-15-1.16-announcement.md: ## @@ -0,0 +1,402 @@ +--- +layout: post +title: "Announcing the Release of Apache Flink 1.16" +subtitle: "" +date: 2022-10-15T08:00:00.000Z +categories: news +authors: +- godfreyhe: + name: "Godfrey He" + twitter: "godfreyhe" + +--- + +Apache Flink continues to grow at a rapid pace and is one of the most active +communities in Apache. Flink 1.16 had over 230 contributors enthusiastically participating, +with 19 FLIPs and 900+ issues completed, bringing a lot of exciting features to the community. + +Flink has become the leading role and factual standard of stream processing, +and the concept of the unification of stream and batch data processing is gradually gaining recognition +and is being successfully implemented in more and more companies. Previously, +the integrated stream and batch concept placed more emphasis on a unified API and +a unified computing framework. This year, based on this, Flink proposed +the next development direction of [Flink-Streaming Warehouse](https://www.alibabacloud.com/blog/more-than-computing-a-new-era-led-by-the-warehouse-architecture-of-apache-flink_598821) (Streamhouse), +which further upgraded the scope of stream-batch integration: it truly completes not only +the unified computation but also unified storage, thus realizing unified real-time analysis. + +In 1.16, the Flink community has completed many improvements for both batch and stream processing: + +- For batch processing, all-round improvements in ease of use, stability and performance + have been completed. 1.16 is a milestone version of Flink batch processing and an important + step towards maturity. + - Ease of use: with the introduction of SQL Gateway and full compatibility with Hive Server2, + users can submit Flink SQL jobs and Hive SQL jobs very easily, and it is also easy to + connect to the original Hive ecosystem. + - Functionality: Introduce Join hints which let Flink SQL users manually specify join strategies + to avoid unreasonable execution plans. The compatibility of Hive SQL has reached 94%, + and users can migrate Hive to Flink at a very low cost. + - Stability: Propose a speculative execution mechanism to reduce the long tail sub-tasks of + a job and improve the stability. Improve HashJoin and introduce failure rollback mechanism + to avoid join failure. + - Performance: Introduce dynamic partition pruning to reduce the Scan I/O and improve join + processing for the star-schema queries. There is 30% improvement in the TPC-DS benchmark. + We can use hybrid shuffle mode to improve resource usage and processing performance. +- For stream processing, there are a number of significant improvements: + - Changelog State Backend provides users with second or even millisecond checkpoints to + dramatically improve the fault tolerance experience, while providing a smaller end-to-end + latency experience for transactional Sink jobs. + - Lookup join is widely used in stream processing. Slow lookup speed, low throughput and + delay update are resolved through common cache mechanism, asynchronous io and retriable lookup. + These features are very useful, solving the pain points that users often complain about, + and supporting richer scenarios. + - From the first day of the birth of Flink SQL, there were some non-deterministic operations + that could cause incorrect results or exceptions, which caused great distress to users. + In 1.16, we spent a lot of effort to solve most of the problems, and we will continue to + improve in the future. + +With the further refinement of the integration of stream and batch, and the continuous iteration of +the Flink Table Store ([0.2 has been released](https://flink.apache.org/news/2022/08/29/release-table-store-0.2.0.html)), +the Flink community is pushing the Streaming warehouse from concept to reality and maturity step by step. + +# Understanding Streaming Warehouses + +To be precise, a streaming warehouse is to make data warehouse streaming, which allows the data +for each layer in the whole warehouse to flow in real-time. The goal is to realize +a Streaming Service with end-to-end real-time performance through a unified API and computing framework. +Please refer to [the article](https://www.alibabacloud.com/blog/more-than-computing-a-new-era-led-by-the-warehouse-architecture-of-apache-flink_598821) +for more details. + +# Batch processing + +Flink is a unified stream batch processing engine, stream processing has become the leading role +thanks to our long-term investment. We’re also putting more effort to improve batch processing +to make it an excellent computing engine. This makes the overall experience of stream batch +unification smoother. + +## SQL Gateway + +The feedback from various channels
[GitHub] [flink] dannycranmer commented on a diff in pull request #21129: [FLINK-29496] [Connector/Kinesis] Add configuration option for setting STS endpoint for AWS connectors
dannycranmer commented on code in PR #21129: URL: https://github.com/apache/flink/pull/21129#discussion_r1001784454 ## flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java: ## @@ -209,6 +200,28 @@ public void refresh() { } } +private static AWSSecurityTokenService createStsClient( +final Properties configProps, final String configPrefix) { +final String region = configProps.getProperty(AWSConfigConstants.AWS_REGION); +final AWSSecurityTokenServiceClientBuilder stsClientBuilder = +AWSSecurityTokenServiceClientBuilder.standard() +.withCredentials( +getCredentialsProvider( +configProps, + AWSConfigConstants.roleCredentialsProvider(configPrefix))); + +if (configProps.containsKey(AWSConfigConstants.AWS_ROLE_STS_ENDPOINT)) { +AwsClientBuilder.EndpointConfiguration endpointConfiguration = +new AwsClientBuilder.EndpointConfiguration( + configProps.getProperty(AWSConfigConstants.AWS_ROLE_STS_ENDPOINT), region); +stsClientBuilder.withEndpointConfiguration(endpointConfiguration); Review Comment: Can you add a unit test for this code path? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)
zentol commented on PR #20757: URL: https://github.com/apache/flink/pull/20757#issuecomment-1286949316 ``` Oct 18 11:05:28 [ERROR] Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 42.188 s <<< FAILURE! - in org.apache.flink.architecture.rules.ApiAnnotationRules Oct 18 11:05:28 [ERROR] ApiAnnotationRules.PUBLIC_API_METHODS_USE_ONLY_PUBLIC_API_TYPES Time elapsed: 0.282 s <<< ERROR! Oct 18 11:05:28 com.tngtech.archunit.library.freeze.StoreUpdateFailedException: Updating frozen violations is disabled (enable by configuration freeze.store.default.allowStoreUpdate=true) Oct 18 11:05:28 at com.tngtech.archunit.library.freeze.ViolationStoreFactory$TextFileBasedViolationStore.save(ViolationStoreFactory.java:125) Oct 18 11:05:28 at com.tngtech.archunit.library.freeze.FreezingArchRule$ViolationStoreLineBreakAdapter.save(FreezingArchRule.java:279) Oct 18 11:05:28 at com.tngtech.archunit.library.freeze.FreezingArchRule.removeObsoleteViolationsFromStore(FreezingArchRule.java:147) Oct 18 11:05:28 at com.tngtech.archunit.library.freeze.FreezingArchRule.removeObsoleteViolationsFromStoreAndReturnNewViolations(FreezingArchRule.java:139) Oct 18 11:05:28 at com.tngtech.archunit.library.freeze.FreezingArchRule.evaluate(FreezingArchRule.java:120) Oct 18 11:05:28 at com.tngtech.archunit.lang.ArchRule$Assertions.check(ArchRule.java:80) Oct 18 11:05:28 at com.tngtech.archunit.library.freeze.FreezingArchRule.check(FreezingArchRule.java:96) ``` :weary: What is it this time... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)
zentol commented on PR #20757: URL: https://github.com/apache/flink/pull/20757#issuecomment-1286942416 FYI: my fix has been merged; we're just waiting for the next japicmp release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21129: [FLINK-29496] [Connector/Kinesis] Add configuration option for setting STS endpoint for AWS connectors
flinkbot commented on PR #21129: URL: https://github.com/apache/flink/pull/21129#issuecomment-1286907632 ## CI report: * 4395f880b56dbc67078c7c222d1f661e50ac8f02 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29713) Kubernetes operator should restart failed jobs
[ https://issues.apache.org/jira/browse/FLINK-29713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-29713: --- Fix Version/s: kubernetes-operator-1.3.0 > Kubernetes operator should restart failed jobs > -- > > Key: FLINK-29713 > URL: https://issues.apache.org/jira/browse/FLINK-29713 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Peter Vary >Assignee: Peter Vary >Priority: Major > Fix For: kubernetes-operator-1.3.0 > > > It would be good to have the possibility to restart the Flink Application if > it goes to {{FAILED}} state. > This could be used to restart, and reconfigure the job dynamically in the > application {{main}} method if the current application can not handle the > incoming data -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-29713) Kubernetes operator should restart failed jobs
[ https://issues.apache.org/jira/browse/FLINK-29713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-29713: -- Assignee: Peter Vary > Kubernetes operator should restart failed jobs > -- > > Key: FLINK-29713 > URL: https://issues.apache.org/jira/browse/FLINK-29713 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Peter Vary >Assignee: Peter Vary >Priority: Major > > It would be good to have the possibility to restart the Flink Application if > it goes to {{FAILED}} state. > This could be used to restart, and reconfigure the job dynamically in the > application {{main}} method if the current application can not handle the > incoming data -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29496) Unable to configure STS endpoint when using ASSUME_ROLE credential provider in Kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-29496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29496: --- Labels: pull-request-available (was: ) > Unable to configure STS endpoint when using ASSUME_ROLE credential provider > in Kinesis connector > > > Key: FLINK-29496 > URL: https://issues.apache.org/jira/browse/FLINK-29496 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Affects Versions: 1.16.0, 1.15.2 >Reporter: Aleksandr Pilipenko >Assignee: Aleksandr Pilipenko >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0, 1.15.3, 1.16.1 > > > When using Kinesis connector with credentials provider configured as > ASSUME_ROLE in the job running in VPC without internet connection, > credentials provider logic tries to access global STS endpoint, > {{{}sts.amazonaws.com{}}}. However, only regional endpoints for STS are > available in that case. > Connector need support for configuring STS endpoint to allow such use-case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] z3d1k opened a new pull request, #21129: [FLINK-29496] [Connector/Kinesis] Add configuration option for setting STS endpoint for AWS connectors
z3d1k opened a new pull request, #21129: URL: https://github.com/apache/flink/pull/21129 ## What is the purpose of the change When using `ASSUME_ROLE` credentials provider option for AWS connectors, STS endpoint is chosen by AWS SDK. In some use cases default endpoint may be unreachable, for example global endpoint is not reachable when job is running inside VPC without internet connection. ## Brief change log Adding configuration option to set STS service endpoint when using `ASSUME_ROLE` credentials provider. ## Verifying this change - Unit tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs / JavaDocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1764#comment-1764 ] Jakub Partyka commented on FLINK-29692: --- It would be really nice if Windowing TVF's supported early/late fires. For us early fire is a must, so we are forced to use legacy group window agg, which lack support for several optimizations: Split Distinct Aggregation and Local-Global Aggregation. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] ferenc-csaky commented on a diff in pull request #21127: [FLINK-29707][cli] Fix possible comparator violation for "flink list"
ferenc-csaky commented on code in PR #21127: URL: https://github.com/apache/flink/pull/21127#discussion_r1001724180 ## flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java: ## @@ -28,21 +28,100 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** Tests for the LIST command. */ class CliFrontendListTest extends CliFrontendTestBase { +private static final List TRICKY_START_TIMES = +Arrays.asList( +1664177946934L, Review Comment: Yeah, I thought something like that by changing the code, will update the PR in a bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-29726) Supports hive stddev_samp function by native implementation
dalongliu created FLINK-29726: - Summary: Supports hive stddev_samp function by native implementation Key: FLINK-29726 URL: https://issues.apache.org/jira/browse/FLINK-29726 Project: Flink Issue Type: Sub-task Reporter: dalongliu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29725) Supports hive std function by native implementation
dalongliu created FLINK-29725: - Summary: Supports hive std function by native implementation Key: FLINK-29725 URL: https://issues.apache.org/jira/browse/FLINK-29725 Project: Flink Issue Type: Sub-task Reporter: dalongliu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29724) Supports hive last_value function by native implementation
dalongliu created FLINK-29724: - Summary: Supports hive last_value function by native implementation Key: FLINK-29724 URL: https://issues.apache.org/jira/browse/FLINK-29724 Project: Flink Issue Type: Sub-task Reporter: dalongliu Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] pnowojski commented on pull request #20033: [FLINK-27944][runtime] Move input metrics out of the inputGate loop, …
pnowojski commented on PR #20033: URL: https://github.com/apache/flink/pull/20033#issuecomment-1286876812 @zhougit86 , could you rebase the PR on the latest master? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery
[ https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1760#comment-1760 ] Martijn Visser commented on FLINK-16419: I've synced offline with [~pnowojski] on this topic. Here's the summary: 1. When you're executing a stop-with-savepoint, there is a guarantee that transactions are committed. 2. For regular savepoints, there is a very small chance that {{notifyCheckpointCompleted()}} has been lost. Virtually nil, but theoretically possible. 3. If after a savepoint there was at least one following successfull checkpoint, then the chances that same subtask lost two {{notifyCheckpointCompleted()}} calls in a row is nil^2 There were discussion in the past (like also is mentioned in the bottom of this ticket) to find a solution to create a checkpoint/savepoint which makes sure that these transactions were committed. However, it would require a potentially breaking API change and it's a non-trivial effort. So that's why it has never been done. One thing to check is if {{org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer#ignoreFailuresAfterTransactionTimeout}} also exists for KafkaSink. If not, that should be addressed. > Avoid to recommit transactions which are known committed successfully to > Kafka upon recovery > > > Key: FLINK-16419 > URL: https://issues.apache.org/jira/browse/FLINK-16419 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Runtime / Checkpointing >Reporter: Jun Qin >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > usability > > When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer > tries to recommit all pre-committed transactions which are in the snapshot, > even if those transactions were successfully committed before (i.e., the call > to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} > returns OK). This may lead to recovery failures when recovering from a very > old snapshot because the transactional IDs in that snapshot may have been > expired and removed from Kafka. For example the following scenario: > # Start a Flink job with FlinkKafkaProducer sink with exactly-once > # Suspend the Flink job with a savepoint A > # Wait for time longer than {{transactional.id.expiration.ms}} + > {{transaction.remove.expired.transaction.cleanup.interval.ms}} > # Recover the job with savepoint A. > # The recovery will fail with the following error: > {noformat} > 2020-02-26 14:33:25,817 INFO > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer > - Attempting to resume transaction Source: Custom Source -> Sink: > Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch > 1202020-02-26 14:33:25,914 INFO org.apache.kafka.clients.Metadata > - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA > 2020-02-26 14:33:26,017 INFO org.apache.kafka.clients.producer.KafkaProducer > - [Producer clientId=producer-1, transactionalId=Source: Custom > Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka > producer with timeoutMillis = 92233720 > 36854775807 ms. > 2020-02-26 14:33:26,019 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Sink: Unnamed (1/1) > (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED. > org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: > The producer attempted to use a producer id which is not currently assigned > to its transactional id. > at > org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) > at java.lang.Thread.run(Thread.java:748) > {noformat} > For now, the workaround is to call > {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as > it may hide real transaction timeout errors. > After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible > way is to let JobManager, after successfully notifies all operators the > completion of
[jira] [Updated] (FLINK-29722) Supports hive max function by native implementation
[ https://issues.apache.org/jira/browse/FLINK-29722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dalongliu updated FLINK-29722: -- Parent: FLINK-29717 Issue Type: Sub-task (was: Improvement) > Supports hive max function by native implementation > --- > > Key: FLINK-29722 > URL: https://issues.apache.org/jira/browse/FLINK-29722 > Project: Flink > Issue Type: Sub-task >Reporter: dalongliu >Priority: Major > Fix For: 1.17.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29723) Supports hive first_value function by native implementation
dalongliu created FLINK-29723: - Summary: Supports hive first_value function by native implementation Key: FLINK-29723 URL: https://issues.apache.org/jira/browse/FLINK-29723 Project: Flink Issue Type: Sub-task Reporter: dalongliu Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29722) Supports hive max function by native implementation
dalongliu created FLINK-29722: - Summary: Supports hive max function by native implementation Key: FLINK-29722 URL: https://issues.apache.org/jira/browse/FLINK-29722 Project: Flink Issue Type: Improvement Reporter: dalongliu Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29721) Supports hive min function by native implementation
dalongliu created FLINK-29721: - Summary: Supports hive min function by native implementation Key: FLINK-29721 URL: https://issues.apache.org/jira/browse/FLINK-29721 Project: Flink Issue Type: Sub-task Reporter: dalongliu Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29720) Supports hive average function by native implemetatoin
dalongliu created FLINK-29720: - Summary: Supports hive average function by native implemetatoin Key: FLINK-29720 URL: https://issues.apache.org/jira/browse/FLINK-29720 Project: Flink Issue Type: Sub-task Reporter: dalongliu Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29719) Supports hive count function by native implementation
dalongliu created FLINK-29719: - Summary: Supports hive count function by native implementation Key: FLINK-29719 URL: https://issues.apache.org/jira/browse/FLINK-29719 Project: Flink Issue Type: Sub-task Reporter: dalongliu Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29718) Supports hive sum function by native implementation
dalongliu created FLINK-29718: - Summary: Supports hive sum function by native implementation Key: FLINK-29718 URL: https://issues.apache.org/jira/browse/FLINK-29718 Project: Flink Issue Type: Sub-task Reporter: dalongliu Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] MartijnVisser commented on a diff in pull request #21128: [FLINK-29710] Bump minimum supported Hadoop version to 2.10.2
MartijnVisser commented on code in PR #21128: URL: https://github.com/apache/flink/pull/21128#discussion_r1001704804 ## flink-connectors/flink-connector-hive/pom.xml: ## @@ -39,6 +39,11 @@ under the License. 0.9.8 10.10.2.0 1.8.2 + + 2.7.5 Review Comment: You're right but at https://hive.apache.org/downloads.html is stated: ``` 7 November 2018: release 2.3.4 available This release works with Hadoop 2.x.y. You can look at the complete [JIRA change log for this release](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12344319=Text=12310843). ``` So my conclusion is: 1. This comment is actually incorrect, because the Hive version that we're pulling in is Hive 2.3.9 https://github.com/apache/flink/blob/master/pom.xml#L171 2. Hive 2.3.9 is also compatible with Hadoop 2.x.y. So we should be able to completely remove all references to `hivemetastore.hadoop.version`, let the Hive connector rely on the Hadoop version from the parent POM and then it's still supported 3. The hive3 profile that we have refers to Hadoop 2.8.2 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/pom.xml#L1103, but that doesn't make sense since hive3 works with Hadoop 3.x.y (per the same download page). I would argue that this value should be set Hadoop 3.1.3 since that's the version that we're using for the `cron_hadoop313` profile. @luoyuxia You're the Hive expert, can you give me your opinion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on a diff in pull request #20860: [FLINK-29347] [runtime] Fix ByteStreamStateHandle#read return -1 when read count is 0
Myasuka commented on code in PR #20860: URL: https://github.com/apache/flink/pull/20860#discussion_r1001688491 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java: ## @@ -148,7 +148,7 @@ public int read(byte[] b, int off, int len) throws IOException { index += bytesToCopy; return bytesToCopy; } else { -return -1; +return len == 0 ? 0 : -1; Review Comment: I see, and I checked some implementations such as `SSLSocketImpl`, `javax.sound.sampled.AudioInputStream`, they all return -1 for `#read()` method if no more bytes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a diff in pull request #21127: [FLINK-29707][cli] Fix possible comparator violation for "flink list"
zentol commented on code in PR #21127: URL: https://github.com/apache/flink/pull/21127#discussion_r1001671017 ## flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java: ## @@ -28,21 +28,100 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** Tests for the LIST command. */ class CliFrontendListTest extends CliFrontendTestBase { +private static final List TRICKY_START_TIMES = +Arrays.asList( +1664177946934L, Review Comment: This would also allow us to test the grouping-by-state behavior, which I'm not sure of if it even works properly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a diff in pull request #21127: [FLINK-29707][cli] Fix possible comparator violation for "flink list"
zentol commented on code in PR #21127: URL: https://github.com/apache/flink/pull/21127#discussion_r1001669758 ## flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java: ## @@ -28,21 +28,100 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** Tests for the LIST command. */ class CliFrontendListTest extends CliFrontendTestBase { +private static final List TRICKY_START_TIMES = +Arrays.asList( +1664177946934L, Review Comment: Should be easy enough to modify the code to make the testing easier. ``` private static void printJobStatusMessages(List jobs) { SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM. HH:mm:ss"); sortJobStatusMessages(jobs) .forEachOrdered( job -> System.out.println( dateFormat.format(new Date(job.getStartTime())) + " : " + job.getJobId() + " : " + job.getJobName() + " (" + job.getJobState() + ")")); } @VisibleForTesting static Stream sortJobStatusMessages(List jobs) { Comparator startTimeComparator = (o1, o2) -> (int) (o1.getStartTime() - o2.getStartTime()); Comparator>> statusComparator = (o1, o2) -> String.CASE_INSENSITIVE_ORDER.compare( o1.getKey().toString(), o2.getKey().toString()); Map> jobsByState = jobs.stream().collect(Collectors.groupingBy(JobStatusMessage::getJobState)); return jobsByState.entrySet().stream() .sorted(statusComparator) .map(Map.Entry::getValue) .flatMap(List::stream) .sorted(startTimeComparator); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ferenc-csaky commented on a diff in pull request #21127: [FLINK-29707][cli] Fix possible comparator violation for "flink list"
ferenc-csaky commented on code in PR #21127: URL: https://github.com/apache/flink/pull/21127#discussion_r1001664100 ## flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java: ## @@ -28,21 +28,100 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** Tests for the LIST command. */ class CliFrontendListTest extends CliFrontendTestBase { +private static final List TRICKY_START_TIMES = +Arrays.asList( +1664177946934L, Review Comment: That is true, but with the current implementation you cannot verify that from the test. The sorting happens in a `private static` method, which just prints the result out. Is it worth it to change that code for this and make it more testable? Or since the fix itself is kinda trivial just leave the test case as is? What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #19870: [FLINK-27882][tests][table] Migrate flink-scala to JUnit5
snuyanzin commented on PR #19870: URL: https://github.com/apache/flink/pull/19870#issuecomment-1286796533 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29711) Topic notification not present in metadata after 60000 ms.
[ https://issues.apache.org/jira/browse/FLINK-29711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622190#comment-17622190 ] Durgesh Mishra commented on FLINK-29711: [~martijnvisser] I am using the KafkaDink and using the AtleastOnce gurantee. We are using Azure eventhub in other projects as well, but there we are not facing this kind of issue. > Topic notification not present in metadata after 6 ms. > -- > > Key: FLINK-29711 > URL: https://issues.apache.org/jira/browse/FLINK-29711 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.14.6 >Reporter: Durgesh Mishra >Priority: Major > > Failed to send data to Kafka null with > FlinkKafkaInternalProducer\{transactionalId='null', inTransaction=false, > closed=false} > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405) > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.base/java.lang.Thread.run(Unknown Source) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29717) Supports hive udaf such as sum/count by native implementation
[ https://issues.apache.org/jira/browse/FLINK-29717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dalongliu updated FLINK-29717: -- Description: The current Flink side of the Hive UDAF has a unified encapsulation HiveGenericUDAF, and the intermediate result type of the aggregation function is encapsulated as RAW type, which is a variable-length data type that is serialized and deserialized by default using the Kryo serializer, so BinaryRowData does not support in-place updates to this type, which also leads to aggregation function that uses the RAW type as an intermediate aggregation buffer and cannot use hash-based aggregation strategies. Since the intermediate state type of Hive's UDAF is RAW type, it also cannot use hash-based aggregation strategy, and the overall performance in TPC-DS scenario is more than 2 times than the performance of Flink's built-in function. After some research, here we propose implementing this common udaf in a native way. > Supports hive udaf such as sum/count by native implementation > --- > > Key: FLINK-29717 > URL: https://issues.apache.org/jira/browse/FLINK-29717 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive, Table SQL / Runtime >Reporter: dalongliu >Priority: Major > Fix For: 1.17.0 > > > The current Flink side of the Hive UDAF has a unified encapsulation > HiveGenericUDAF, and the intermediate result type of the aggregation function > is encapsulated as RAW type, which is a variable-length data type that is > serialized and deserialized by default using the Kryo serializer, so > BinaryRowData does not support in-place updates to this type, which also > leads to aggregation function that uses the RAW type as an intermediate > aggregation buffer and cannot use hash-based aggregation strategies. Since > the intermediate state type of Hive's UDAF is RAW type, it also cannot use > hash-based aggregation strategy, and the overall performance in TPC-DS > scenario is more than 2 times than the performance of Flink's built-in > function. After some research, here we propose implementing this common udaf > in a native way. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] gaborgsomogyi commented on pull request #20206: [FLINK-25909][runtime][security] Add HBaseDelegationTokenProvider
gaborgsomogyi commented on PR #20206: URL: https://github.com/apache/flink/pull/20206#issuecomment-1286762134 @JackWangCS [This](https://github.com/apache/flink/pull/20206#discussion_r928147951) is the last questionable part to be merged. Are you actively working on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-29717) Supports hive udaf such as sum/count by native implementation
dalongliu created FLINK-29717: - Summary: Supports hive udaf such as sum/count by native implementation Key: FLINK-29717 URL: https://issues.apache.org/jira/browse/FLINK-29717 Project: Flink Issue Type: Improvement Components: Connectors / Hive, Table SQL / Runtime Reporter: dalongliu Fix For: 1.17.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] MOBIN-F closed pull request #290: [FLINK-29252]Support create table-store table with 'connector'='table-store'
MOBIN-F closed pull request #290: [FLINK-29252]Support create table-store table with 'connector'='table-store' URL: https://github.com/apache/flink-table-store/pull/290 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-28157) Table Store Hive Reader supports Hive3
[ https://issues.apache.org/jira/browse/FLINK-28157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-28157: --- Labels: pull-request-available (was: ) > Table Store Hive Reader supports Hive3 > -- > > Key: FLINK-28157 > URL: https://issues.apache.org/jira/browse/FLINK-28157 > Project: Flink > Issue Type: New Feature > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Minor > Labels: pull-request-available > Fix For: table-store-0.3.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #329: [FLINK-28157] Table Store Hive Reader supports Hive3
JingsongLi commented on code in PR #329: URL: https://github.com/apache/flink-table-store/pull/329#discussion_r1001612616 ## flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java: ## @@ -894,18 +895,18 @@ public void testDateAndTimestamp() throws Exception { "STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'", "LOCATION '" + path + "'"))); Assert.assertEquals( -Collections.singletonList("1971-01-11\t2022-05-17 17:29:20.0"), Review Comment: Modify test data to `.1`, because hive3 will produce `17:29:20` for `17:29:20.0` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-28157) Table Store Hive Reader supports Hive3
[ https://issues.apache.org/jira/browse/FLINK-28157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-28157: Assignee: Jingsong Lee > Table Store Hive Reader supports Hive3 > -- > > Key: FLINK-28157 > URL: https://issues.apache.org/jira/browse/FLINK-28157 > Project: Flink > Issue Type: New Feature > Components: Table Store >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Minor > Fix For: table-store-0.3.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] liyubin117 commented on pull request #21116: [FLINK-29679][table] DESCRIBE statement shows column comment
liyubin117 commented on PR #21116: URL: https://github.com/apache/flink/pull/21116#issuecomment-1286729949 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] mbalassi merged pull request #20265: [FLINK-25910][runtime][security] Propagate obtained delegation tokens to TaskManagers
mbalassi merged PR #20265: URL: https://github.com/apache/flink/pull/20265 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29472) Create shared release scripts
[ https://issues.apache.org/jira/browse/FLINK-29472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622145#comment-17622145 ] Chesnay Schepler commented on FLINK-29472: -- I've created a [repository|https://github.com/apache/flink-connector-shared-utils/] but there's some permission issue with GitHub. Once that is resolved I'll publish the scripts I wrote into said repository. > Create shared release scripts > - > > Key: FLINK-29472 > URL: https://issues.apache.org/jira/browse/FLINK-29472 > Project: Flink > Issue Type: Sub-task > Components: Release System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > > With the versioning & branching model being identical we should be able to > share all release scripts. Put them into a central location that projects > can rely on (e.g., via a git submodule). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol commented on a diff in pull request #21128: [FLINK-29710] Bump minimum supported Hadoop version to 2.10.2
zentol commented on code in PR #21128: URL: https://github.com/apache/flink/pull/21128#discussion_r1001551554 ## docs/content.zh/docs/deployment/filesystems/gcs.md: ## @@ -68,7 +68,7 @@ You must include the following jars in Flink's `lib` directory to connect Flink ``` -We have tested with `flink-shared-hadoop2-uber` version >= `2.8.5-1.8.3`. +We have tested with `flink-shared-hadoop2-uber` version >= `2.10.2-1.8.3`. Review Comment: There is no flink-shaded-hadoop for this version. ## flink-connectors/flink-connector-hive/pom.xml: ## @@ -39,6 +39,11 @@ under the License. 0.9.8 10.10.2.0 1.8.2 + + 2.7.5 Review Comment: If the minimum Hadoop version 2.10 but some version of the hive connector is needs Hadoop 2.7, well then said hive connector version is just no longer supported. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-29695) Create a utility to report the status of the last savepoint
[ https://issues.apache.org/jira/browse/FLINK-29695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-29695. -- Fix Version/s: kubernetes-operator-1.3.0 Resolution: Fixed merged to main 67d4cef5aadf05bb043c4df0a127c252666b31c1 > Create a utility to report the status of the last savepoint > --- > > Key: FLINK-29695 > URL: https://issues.apache.org/jira/browse/FLINK-29695 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Clara Xiong >Assignee: Clara Xiong >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.3.0 > > > Users want to know the status of last savepoint, especially for manually > triggered ones, to manage savepoints. > Currently, users can infer the status of the last savepoint (PENDING, > SUCCEEDED and ABANDONED) from jobStatus.triggerId, > lastSavepoint.triggerNonce, spec.job.savepointTriggerNonce and > savepointTriggerNonce from last reconciliation. If the last savepoint is not > manually triggered, there is no ABANDONED status, only PENDING or SUCCEEDED. > Creating a utility will encapsulate the internal logic of Flink operator > guard against regression by any future version changes. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #406: [FLINK-29695] Create a utility to report the status of the last savep…
gyfora merged PR #406: URL: https://github.com/apache/flink-kubernetes-operator/pull/406 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29711) Topic notification not present in metadata after 60000 ms.
[ https://issues.apache.org/jira/browse/FLINK-29711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622133#comment-17622133 ] Martijn Visser commented on FLINK-29711: Which Kafka sink are you using, FlinkKafkaProducer or KafkaSink? Are you using Exactly Once or a different guarantee? I know that Azure Eventhub can emulate Kafka, but we've also seen that these emulators can't exactly mirror everything that Kafka is doing (we've seen the same with RedPanda). My suspicion would be that there's something at the Azure Eventhub side of things that doesn't work exactly the same as in Kafka. > Topic notification not present in metadata after 6 ms. > -- > > Key: FLINK-29711 > URL: https://issues.apache.org/jira/browse/FLINK-29711 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.14.6 >Reporter: Durgesh Mishra >Priority: Major > > Failed to send data to Kafka null with > FlinkKafkaInternalProducer\{transactionalId='null', inTransaction=false, > closed=false} > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405) > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.base/java.lang.Thread.run(Unknown Source) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29706) Remove japicmp dependency bumps
[ https://issues.apache.org/jira/browse/FLINK-29706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-29706. Resolution: Fixed master: b8672a230a3d34e09fddc7f506e090910e2d202e > Remove japicmp dependency bumps > --- > > Key: FLINK-29706 > URL: https://issues.apache.org/jira/browse/FLINK-29706 > Project: Flink > Issue Type: Technical Debt > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > Way back when we worked on Java 11 support we bumped several dependencies > from japicmp. > These are no longer required for the latest version that we're using. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol merged pull request #21126: [FLINK-29706][build] Remove japicmp dependency bumps
zentol merged PR #21126: URL: https://github.com/apache/flink/pull/21126 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pnowojski commented on pull request #20151: [FLINK-26803][checkpoint] Merging channel state files
pnowojski commented on PR #20151: URL: https://github.com/apache/flink/pull/20151#issuecomment-1286649770 > As I understand, you mean that multiple ChannelStateWriterImpl share the same > ChannelStateWriteRequestExecutorImpl. When channel-state.number-of-tasks-share-file=5, each thread is responsible > for writing the channel state file for 5 subtasks, right? Since the file is written in a single thread, there is no need to consider thread safety issues. Yes, that's what I had in mind. > Your proposal should be clearer. I will try to refactor the code according to your proposal. Thanks again~ Great! But please keep in mind that I haven't thought it fully through and I haven't tried to implement it myself, so if you encounter some obstacles, feel free to reach out to me before going too deep! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xintongsong commented on a diff in pull request #21122: [FLINK-28889] Hybrid shuffle supports multiple consumer and broadcast optimization
xintongsong commented on code in PR #21122: URL: https://github.com/apache/flink/pull/21122#discussion_r1001497856 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java: ## @@ -72,8 +73,8 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0); -private final Map subpartitionViewOperationsMap = -new ConcurrentHashMap<>(); +private final List> +subpartitionViewOperationsMap; Review Comment: Definition should be documented. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java: ## @@ -79,4 +79,24 @@ enum ConsumeStatus { /** The buffer is either consumed or not consumed. */ ALL } + +/** This class represents a pair of {@link ConsumeStatus} and consumer id. */ +class ConsumeStatusWithId { +public static final ConsumeStatusWithId ALL_CONSUME_STATUS = +new ConsumeStatusWithId(ConsumeStatus.ALL, -1); + +ConsumeStatus status; + +int consumerId; Review Comment: I think we need a dedicated class for the consumer id, where we can define the special values such as `ANY` and `SINGLE_CONSUMER`. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java: ## @@ -72,8 +73,8 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0); -private final Map subpartitionViewOperationsMap = -new ConcurrentHashMap<>(); +private final List> +subpartitionViewOperationsMap; Review Comment: Can we replace the outer `List` with an array here, to align with `subpartitionMemoryDataManagers`? ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java: ## @@ -72,8 +73,8 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0); -private final Map subpartitionViewOperationsMap = -new ConcurrentHashMap<>(); +private final List> +subpartitionViewOperationsMap; Review Comment: Why the inner is a `Map` rather than a `List`. With a list, we won't need the `consumerIdCounter` in `HsResultPartition`. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java: ## @@ -161,15 +164,15 @@ public void append(ByteBuffer record, int targetChannel, Buffer.DataType dataTyp * subpartition. */ public HsDataView registerSubpartitionView( -int subpartitionId, HsSubpartitionViewInternalOperations viewOperations) { +int subpartitionId, +int consumerId, +HsSubpartitionViewInternalOperations viewOperations) { HsSubpartitionViewInternalOperations oldView = -subpartitionViewOperationsMap.put(subpartitionId, viewOperations); -if (oldView != null) { -LOG.debug( -"subpartition : {} register subpartition view will replace old view. ", -subpartitionId); -} -return getSubpartitionMemoryDataManager(subpartitionId); + subpartitionViewOperationsMap.get(subpartitionId).put(consumerId, viewOperations); +Preconditions.checkState( +oldView == null, "Each subpartition view should have unique consumerId."); +return getSubpartitionMemoryDataManager(subpartitionId) Review Comment: Shall we also check that selective strategy should not have multiple consumers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-29716) Separate slf4j jar in the lib folder from the distribution
Alexis Sarda-Espinosa created FLINK-29716: - Summary: Separate slf4j jar in the lib folder from the distribution Key: FLINK-29716 URL: https://issues.apache.org/jira/browse/FLINK-29716 Project: Flink Issue Type: Improvement Affects Versions: 1.15.2 Reporter: Alexis Sarda-Espinosa Flink's binary distribution includes several jars under the {{lib}} folder, which has individual jars for all log4j artifacts. This makes it relatively easy to swap out those logging jars when necessary, for example when critical vulnerabilities are found (as was recently the case). With SLF4J 2.+, some breaking changes mean that many implementations are not directly backwards compatible, see for example the [notes for log4j2|https://logging.apache.org/log4j/2.x/log4j-slf4j-impl/index.html]. This means that, in the future, if swapping logging jars were necessary, the SLF4J jar might have to be changed as well. Right now the SLF4J jar is not included separately in the distribution, I believe it's packed inside the {{flink-dist}} jar, although I'm not sure. It would be better to separate that as it is done for the default log4j2 jars. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29711) Topic notification not present in metadata after 60000 ms.
[ https://issues.apache.org/jira/browse/FLINK-29711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622124#comment-17622124 ] Durgesh Mishra edited comment on FLINK-29711 at 10/21/22 8:36 AM: -- Hello [~martijnvisser] We created one flink job and one Azure eventhub. Flink is processing real time data which published to Azure event hub and after continually running of flink job for about 10 hours above exception occurs. Used following configuration. # Checkpoints configurations checkpoints.interval= 24 checkpoints.minPauseBetweenCheckpoints= 12 checkpoints.timeout= 11 # Common Flink-Kafka-Connector(Source and Sink) configurations allow.auto.create.topics=false auto.offset.reset=latest request.timeout.ms=6 transaction.timeout.ms=6 kafka.semantic=1 kafka.internalProducerPoolSize=5 # For reducing the kafka timeout max.block.ms=5000 # For increasing the metadata fetch time metadata.max.idle.ms= 18 was (Author: JIRAUSER297316): Hello [~martijnvisser] We created one flink job and one Azure eventhub. Flink is processing real time data is published to Azure event hub and after contionusly running of flink job for about 10 hours above exception occurs. Used following configuration. # Checkpoints configurations checkpoints.interval= 24 checkpoints.minPauseBetweenCheckpoints= 12 checkpoints.timeout= 11 # Common Flink-Kafka-Connector(Source and Sink) configurations allow.auto.create.topics=false auto.offset.reset=latest request.timeout.ms=6 transaction.timeout.ms=6 kafka.semantic=1 kafka.internalProducerPoolSize=5 # For reducing the kafka timeout max.block.ms=5000 # For increasing the metadata fetch time metadata.max.idle.ms= 18 > Topic notification not present in metadata after 6 ms. > -- > > Key: FLINK-29711 > URL: https://issues.apache.org/jira/browse/FLINK-29711 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.14.6 >Reporter: Durgesh Mishra >Priority: Major > > Failed to send data to Kafka null with > FlinkKafkaInternalProducer\{transactionalId='null', inTransaction=false, > closed=false} > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405) > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.base/java.lang.Thread.run(Unknown Source) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29711) Topic notification not present in metadata after 60000 ms.
[ https://issues.apache.org/jira/browse/FLINK-29711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622124#comment-17622124 ] Durgesh Mishra edited comment on FLINK-29711 at 10/21/22 8:35 AM: -- Hello [~martijnvisser] We created one flink job and one Azure eventhub. Flink is processing real time data is published to Azure event hub and after contionusly running of flink job for about 10 hours above exception occurs. Used following configuration. # Checkpoints configurations checkpoints.interval= 24 checkpoints.minPauseBetweenCheckpoints= 12 checkpoints.timeout= 11 # Common Flink-Kafka-Connector(Source and Sink) configurations allow.auto.create.topics=false auto.offset.reset=latest request.timeout.ms=6 transaction.timeout.ms=6 kafka.semantic=1 kafka.internalProducerPoolSize=5 # For reducing the kafka timeout max.block.ms=5000 # For increasing the metadata fetch time metadata.max.idle.ms= 18 was (Author: JIRAUSER297316): Hello [~martijnvisser] We created one flink application and one Azure eventhub. Flink is processing real time data is published to Azure event hub and after contionusly running of flink job for about 10 hours above exception occurs. Used following configuration. # Checkpoints configurations checkpoints.interval= 24 checkpoints.minPauseBetweenCheckpoints= 12 checkpoints.timeout= 11 # Common Flink-Kafka-Connector(Source and Sink) configurations allow.auto.create.topics=false auto.offset.reset=latest request.timeout.ms=6 transaction.timeout.ms=6 kafka.semantic=1 kafka.internalProducerPoolSize=5 # For reducing the kafka timeout max.block.ms=5000 # For increasing the metadata fetch time metadata.max.idle.ms= 18 > Topic notification not present in metadata after 6 ms. > -- > > Key: FLINK-29711 > URL: https://issues.apache.org/jira/browse/FLINK-29711 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.14.6 >Reporter: Durgesh Mishra >Priority: Major > > Failed to send data to Kafka null with > FlinkKafkaInternalProducer\{transactionalId='null', inTransaction=false, > closed=false} > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405) > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.base/java.lang.Thread.run(Unknown Source) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29711) Topic notification not present in metadata after 60000 ms.
[ https://issues.apache.org/jira/browse/FLINK-29711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622124#comment-17622124 ] Durgesh Mishra commented on FLINK-29711: Hello [~martijnvisser] We created one flink application and one Azure eventhub. Flink is processing real time data is published to Azure event hub and after contionusly running of flink job for about 10 hours above exception occurs. Used following configuration. # Checkpoints configurations checkpoints.interval= 24 checkpoints.minPauseBetweenCheckpoints= 12 checkpoints.timeout= 11 # Common Flink-Kafka-Connector(Source and Sink) configurations allow.auto.create.topics=false auto.offset.reset=latest request.timeout.ms=6 transaction.timeout.ms=6 kafka.semantic=1 kafka.internalProducerPoolSize=5 # For reducing the kafka timeout max.block.ms=5000 # For increasing the metadata fetch time metadata.max.idle.ms= 18 > Topic notification not present in metadata after 6 ms. > -- > > Key: FLINK-29711 > URL: https://issues.apache.org/jira/browse/FLINK-29711 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.14.6 >Reporter: Durgesh Mishra >Priority: Major > > Failed to send data to Kafka null with > FlinkKafkaInternalProducer\{transactionalId='null', inTransaction=false, > closed=false} > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405) > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.base/java.lang.Thread.run(Unknown Source) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-21239) Upgrade Calcite version to 1.28
[ https://issues.apache.org/jira/browse/FLINK-21239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622120#comment-17622120 ] Sergey Nuyanzin commented on FLINK-21239: - Thanks for assignment yes i would take it > Upgrade Calcite version to 1.28 > --- > > Key: FLINK-21239 > URL: https://issues.apache.org/jira/browse/FLINK-21239 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > > The following files should be removed from the Flink code base during an > upgrade: > - org.apache.calcite.rex.RexLiteral -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29715) Expose max_parallelism in JSON plan
Gyula Fora created FLINK-29715: -- Summary: Expose max_parallelism in JSON plan Key: FLINK-29715 URL: https://issues.apache.org/jira/browse/FLINK-29715 Project: Flink Issue Type: Improvement Components: Runtime / REST Reporter: Gyula Fora Assignee: Gyula Fora The JobGraph json plan currently only contains vertex parallelism but not the max_parallelism. This could be very useful information to also show on the UI for debugging data skew/performance issues or for any tooling that relies on the jobgraph information gathered from the rest endpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-24932) Frocksdb cannot run on Apple M1
[ https://issues.apache.org/jira/browse/FLINK-24932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622115#comment-17622115 ] Yun Tang commented on FLINK-24932: -- [~martijnvisser] We have built and released a package which only rebuild the mac-related so binaries in https://search.maven.org/artifact/io.github.myasuka/frocksdbjni/6.20.3-ververica-1.1/jar , however, we did not make the PR (https://github.com/ververica/frocksdb/pull/50 ) could be merged before flink-1.16 feature freeze due to lacking review resources. I think it could be done in flink-1.17. > Frocksdb cannot run on Apple M1 > --- > > Key: FLINK-24932 > URL: https://issues.apache.org/jira/browse/FLINK-24932 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Minor > Fix For: 1.17.0 > > > After we bump up RocksDB version to 6.20.3, we support to run RocksDB on > linux arm cluster. However, according to the feedback from Robert, Apple M1 > machines cannot run FRocksDB yet: > {code:java} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255) > ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) > ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) > ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) > ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) > ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-runtime-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > ~[flink-runtime-1.14.0.jar:1.14.0] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > ~[flink-runtime-1.14.0.jar:1.14.0] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-runtime-1.14.0.jar:1.14.0] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for StreamFlatMap_c21234bcbf1e8eb4c61f1927190efebd_(1/1) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) > ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) > ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0] > ... 11 more > Caused by: java.io.IOException: Could not load the native RocksDB library > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(EmbeddedRocksDBStateBackend.java:882) > ~[flink-statebackend-rocksdb_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:402) > ~[flink-statebackend-rocksdb_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:345) > ~[flink-statebackend-rocksdb_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:87) > ~[flink-statebackend-rocksdb_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) > ~[flink-streaming-java_2.11-1.14.0.jar:1.14.0] > at >
[GitHub] [flink] wanglijie95 commented on pull request #20097: [FLINK-28284][Connectors/Jdbc] Add JdbcSink with new format
wanglijie95 commented on PR #20097: URL: https://github.com/apache/flink/pull/20097#issuecomment-1286625764 I will review this in the next few weeks :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29711) Topic notification not present in metadata after 60000 ms.
[ https://issues.apache.org/jira/browse/FLINK-29711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622105#comment-17622105 ] Martijn Visser commented on FLINK-29711: Please add more information to the ticket next to the stacktrace. How did you end up with this error, what steps can be taken to reproduce the issue? > Topic notification not present in metadata after 6 ms. > -- > > Key: FLINK-29711 > URL: https://issues.apache.org/jira/browse/FLINK-29711 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.14.6 >Reporter: Durgesh Mishra >Priority: Major > > Failed to send data to Kafka null with > FlinkKafkaInternalProducer\{transactionalId='null', inTransaction=false, > closed=false} > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405) > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.base/java.lang.Thread.run(Unknown Source) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] echauchot commented on pull request #21073: [FLINK-26822] Add Source for Cassandra connector
echauchot commented on PR #21073: URL: https://github.com/apache/flink/pull/21073#issuecomment-1286623220 > > I don't know Flink specifics on reviews but can a non-committer give LGTM and then a committer simply merge ? > > > It could [...] > > Correction: No. The committer _must_ check the PR. But as Martijn said it can speed things up. Yes this is what I do on Beam project: even though the project accepts that a non-committer reviews, I never merge a PR if I have not reviewed all of it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29714) Merge TableWrite and TableCompact into one interface
[ https://issues.apache.org/jira/browse/FLINK-29714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29714: --- Labels: pull-request-available (was: ) > Merge TableWrite and TableCompact into one interface > > > Key: FLINK-29714 > URL: https://issues.apache.org/jira/browse/FLINK-29714 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > > To make sure that full compaction is triggered constantly for every written > bucket regardless of failure, we need to add {{compact}} interface to > {{TableWrite}} so that Flink sink operators can trigger compaction when > needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-29714) Merge TableWrite and TableCompact into one interface
[ https://issues.apache.org/jira/browse/FLINK-29714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng reassigned FLINK-29714: --- Assignee: Caizhi Weng > Merge TableWrite and TableCompact into one interface > > > Key: FLINK-29714 > URL: https://issues.apache.org/jira/browse/FLINK-29714 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Fix For: table-store-0.3.0 > > > To make sure that full compaction is triggered constantly for every written > bucket regardless of failure, we need to add {{compact}} interface to > {{TableWrite}} so that Flink sink operators can trigger compaction when > needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-table-store] tsreaper opened a new pull request, #328: [FLINK-29714] Merge TableWrite and TableCompact into one interface
tsreaper opened a new pull request, #328: URL: https://github.com/apache/flink-table-store/pull/328 To make sure that full compaction is triggered constantly for every written bucket regardless of failure, we need to add `compact` interface to `TableWrite` so that Flink sink operators can trigger compaction when needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-29714) Merge TableWrite and TableCompact into one interface
Caizhi Weng created FLINK-29714: --- Summary: Merge TableWrite and TableCompact into one interface Key: FLINK-29714 URL: https://issues.apache.org/jira/browse/FLINK-29714 Project: Flink Issue Type: Sub-task Components: Table Store Reporter: Caizhi Weng Fix For: table-store-0.3.0 To make sure that full compaction is triggered constantly for every written bucket regardless of failure, we need to add {{compact}} interface to {{TableWrite}} so that Flink sink operators can trigger compaction when needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29612) Extract changelog files out of DataFileMeta#extraFiles
[ https://issues.apache.org/jira/browse/FLINK-29612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng closed FLINK-29612. --- Resolution: Fixed > Extract changelog files out of DataFileMeta#extraFiles > -- > > Key: FLINK-29612 > URL: https://issues.apache.org/jira/browse/FLINK-29612 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.3.0 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > > Currently changelog files are stored as extra files in {{DataFileMeta}}. > However for the full compaction changelog we're about to introduce, it cannot > be added as extra files because their statistics might be different from the > corresponding merge tree files. > We need to extract changelog files out of DataFileMeta#extraFiles. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-29612) Extract changelog files out of DataFileMeta#extraFiles
[ https://issues.apache.org/jira/browse/FLINK-29612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng reassigned FLINK-29612: --- Assignee: Caizhi Weng > Extract changelog files out of DataFileMeta#extraFiles > -- > > Key: FLINK-29612 > URL: https://issues.apache.org/jira/browse/FLINK-29612 > Project: Flink > Issue Type: Sub-task > Components: Table Store >Affects Versions: table-store-0.3.0 >Reporter: Caizhi Weng >Assignee: Caizhi Weng >Priority: Major > Labels: pull-request-available > Fix For: table-store-0.3.0 > > > Currently changelog files are stored as extra files in {{DataFileMeta}}. > However for the full compaction changelog we're about to introduce, it cannot > be added as extra files because their statistics might be different from the > corresponding merge tree files. > We need to extract changelog files out of DataFileMeta#extraFiles. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29713) Kubernetes operator should restart failed jobs
Peter Vary created FLINK-29713: -- Summary: Kubernetes operator should restart failed jobs Key: FLINK-29713 URL: https://issues.apache.org/jira/browse/FLINK-29713 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Peter Vary It would be good to have the possibility to restart the Flink Application if it goes to {{FAILED}} state. This could be used to restart, and reconfigure the job dynamically in the application {{main}} method if the current application can not handle the incoming data -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29712) The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 1.16.0-rc2
[ https://issues.apache.org/jira/browse/FLINK-29712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] macdoor615 closed FLINK-29712. -- Resolution: Fixed upgraded client to Flink 1.16.0-rc2 from 1.16.0-rc1. Problem solved > The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in > 1.16.0-rc2 > > > Key: FLINK-29712 > URL: https://issues.apache.org/jira/browse/FLINK-29712 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 > Environment: Flink 1.16.0-rc2 > Hive 3.1.3 > Hadoop 3.3.4 >Reporter: macdoor615 >Priority: Blocker > Fix For: 1.16.0 > > Attachments: flink-conf.yaml > > > All my batch jobs have failed with same error. All streaming jobs work fine. > {code:java} > org.apache.flink.runtime.JobException: Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, > backoffTimeMS=6) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:299) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:635) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:537) > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > Caused by: org.apache.flink.util.FlinkException: Global failure triggered by > OperatorCoordinator for 'Source: p_hswtv[4] -> Calc[5]' (operator > 6cdc5bb954874d922eaee11a8e7b5dd5). > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:360) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:217) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:198) > at >
[jira] [Updated] (FLINK-29711) Topic notification not present in metadata after 60000 ms.
[ https://issues.apache.org/jira/browse/FLINK-29711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-29711: --- Component/s: Connectors / Kafka (was: API / Core) > Topic notification not present in metadata after 6 ms. > -- > > Key: FLINK-29711 > URL: https://issues.apache.org/jira/browse/FLINK-29711 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.4, 1.14.6 >Reporter: Durgesh Mishra >Priority: Major > > Failed to send data to Kafka null with > FlinkKafkaInternalProducer\{transactionalId='null', inTransaction=false, > closed=false} > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405) > at > org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.base/java.lang.Thread.run(Unknown Source) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29712) The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in 1.16.0-rc2
[ https://issues.apache.org/jira/browse/FLINK-29712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622095#comment-17622095 ] macdoor615 commented on FLINK-29712: @luoyuxia I upgraded client to Flink 1.16.0-rc2 from 1.16.0-rc1. Problem solved. Thx > The same batch task works fine in 1.15.2 and 1.16.0-rc1, but fails in > 1.16.0-rc2 > > > Key: FLINK-29712 > URL: https://issues.apache.org/jira/browse/FLINK-29712 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 > Environment: Flink 1.16.0-rc2 > Hive 3.1.3 > Hadoop 3.3.4 >Reporter: macdoor615 >Priority: Blocker > Fix For: 1.16.0 > > Attachments: flink-conf.yaml > > > All my batch jobs have failed with same error. All streaming jobs work fine. > {code:java} > org.apache.flink.runtime.JobException: Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, > backoffTimeMS=6) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:299) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:635) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:537) > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > Caused by: org.apache.flink.util.FlinkException: Global failure triggered by > OperatorCoordinator for 'Source: p_hswtv[4] -> Calc[5]' (operator > 6cdc5bb954874d922eaee11a8e7b5dd5). > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:617) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:360) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:217) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:198)
[jira] [Commented] (FLINK-24119) KafkaITCase.testTimestamps fails due to "Topic xxx already exist"
[ https://issues.apache.org/jira/browse/FLINK-24119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622093#comment-17622093 ] Martijn Visser commented on FLINK-24119: [~renqs] There's still something wrong, can you take a look? What has changed is that the images for Confluent Platform have been upgraded to the same Kafka version as we have the client (via FLINK-28405), but I believe that's just made the issue visible again because apparently it was not resolved. > KafkaITCase.testTimestamps fails due to "Topic xxx already exist" > - > > Key: FLINK-24119 > URL: https://issues.apache.org/jira/browse/FLINK-24119 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0, 1.15.0, 1.16.0 >Reporter: Xintong Song >Assignee: Qingsheng Ren >Priority: Critical > Labels: auto-deprioritized-critical, test-stability > Fix For: 1.16.1 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23328=logs=c5f0071e-1851-543e-9a45-9ac140befc32=15a22db7-8faa-5b34-3920-d33c9f0ca23c=7419 > {code} > Sep 01 15:53:20 [ERROR] Tests run: 23, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 162.65 s <<< FAILURE! - in > org.apache.flink.streaming.connectors.kafka.KafkaITCase > Sep 01 15:53:20 [ERROR] testTimestamps Time elapsed: 23.237 s <<< FAILURE! > Sep 01 15:53:20 java.lang.AssertionError: Create test topic : tstopic failed, > org.apache.kafka.common.errors.TopicExistsException: Topic 'tstopic' already > exists. > Sep 01 15:53:20 at org.junit.Assert.fail(Assert.java:89) > Sep 01 15:53:20 at > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:226) > Sep 01 15:53:20 at > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:112) > Sep 01 15:53:20 at > org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:212) > Sep 01 15:53:20 at > org.apache.flink.streaming.connectors.kafka.KafkaITCase.testTimestamps(KafkaITCase.java:191) > Sep 01 15:53:20 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Sep 01 15:53:20 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Sep 01 15:53:20 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Sep 01 15:53:20 at java.lang.reflect.Method.invoke(Method.java:498) > Sep 01 15:53:20 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Sep 01 15:53:20 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Sep 01 15:53:20 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Sep 01 15:53:20 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Sep 01 15:53:20 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > Sep 01 15:53:20 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > Sep 01 15:53:20 at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > Sep 01 15:53:20 at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29540) SubQueryAntiJoinTest started to fail after Calcite 1.27
[ https://issues.apache.org/jira/browse/FLINK-29540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-29540: - Parent Issue: FLINK-21239 (was: FLINK-20873) > SubQueryAntiJoinTest started to fail after Calcite 1.27 > --- > > Key: FLINK-29540 > URL: https://issues.apache.org/jira/browse/FLINK-29540 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Sergey Nuyanzin >Priority: Major > > Probably the reason is https://issues.apache.org/jira/browse/CALCITE-4560 > > some tests are failing with > {noformat} > java.lang.NullPointerException > at > org.apache.calcite.sql2rel.RelDecorrelator.createValueGenerator(RelDecorrelator.java:858) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateInputWithValueGenerator(RelDecorrelator.java:1070) > at > org.apache.calcite.sql2rel.RelDecorrelator.maybeAddValueGenerator(RelDecorrelator.java:987) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1199) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1165) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:531) > at > org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:729) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:771) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:760) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:531) > at > org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:729) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1236) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1218) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:531) > at > org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:729) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:771) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:760) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:531) > at > org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:729) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1186) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1165) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:531) > at > org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:729) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:771) > at > org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:760) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at >
[jira] [Updated] (FLINK-29237) RexSimplify can not be removed after update to calcite 1.27
[ https://issues.apache.org/jira/browse/FLINK-29237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-29237: - Parent Issue: FLINK-21239 (was: FLINK-20873) > RexSimplify can not be removed after update to calcite 1.27 > --- > > Key: FLINK-29237 > URL: https://issues.apache.org/jira/browse/FLINK-29237 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Planner >Reporter: Sergey Nuyanzin >Priority: Major > > It seems there is some work should be done to make it happen > Currently removal of RexSimplify from Flink repo leads to failure of several > tests like > {{IntervalJoinTest#testFallbackToRegularJoin}} > {{CalcITCase#testOrWithIsNullInIf}} > {{CalcITCase#testOrWithIsNullPredicate}} > example of failure > {noformat} > Sep 07 11:25:08 java.lang.AssertionError: > Sep 07 11:25:08 > Sep 07 11:25:08 Results do not match for query: > Sep 07 11:25:08 > Sep 07 11:25:08 SELECT * FROM NullTable3 AS T > Sep 07 11:25:08 WHERE T.a = 1 OR T.a = 3 OR T.a IS NULL > Sep 07 11:25:08 > Sep 07 11:25:08 > Sep 07 11:25:08 Results > Sep 07 11:25:08 == Correct Result - 4 == == Actual Result - 2 == > Sep 07 11:25:08 +I[1, 1, Hi] +I[1, 1, Hi] > Sep 07 11:25:08 +I[3, 2, Hello world] +I[3, 2, Hello world] > Sep 07 11:25:08 !+I[null, 999, NullTuple] > Sep 07 11:25:08 !+I[null, 999, NullTuple] > Sep 07 11:25:08 > Sep 07 11:25:08 Plan: > Sep 07 11:25:08 == Abstract Syntax Tree == > Sep 07 11:25:08 LogicalProject(inputs=[0..2]) > Sep 07 11:25:08 +- LogicalFilter(condition=[OR(=($0, 1), =($0, 3), IS > NULL($0))]) > Sep 07 11:25:08+- LogicalTableScan(table=[[default_catalog, > default_database, NullTable3]]) > Sep 07 11:25:08 > Sep 07 11:25:08 == Optimized Logical Plan == > Sep 07 11:25:08 Calc(select=[a, b, c], where=[SEARCH(a, Sarg[1, 3; NULL AS > TRUE])]) > Sep 07 11:25:08 +- BoundedStreamScan(table=[[default_catalog, > default_database, NullTable3]], fields=[a, b, c]) > Sep 07 11:25:08 > Sep 07 11:25:08 > Sep 07 11:25:08 at org.junit.Assert.fail(Assert.java:89) > Sep 07 11:25:08 at > org.apache.flink.table.planner.runtime.utils.BatchTestBase.$anonfun$check$1(BatchTestBase.scala:154) > Sep 07 11:25:08 at > org.apache.flink.table.planner.runtime.utils.BatchTestBase.$anonfun$check$1$adapted(BatchTestBase.scala:147) > Sep 07 11:25:08 at scala.Option.foreach(Option.scala:257) > Sep 07 11:25:08 at > org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:147) > Sep 07 11:25:08 at > org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162) > Sep 07 11:25:08 at > org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562) > Sep 07 11:25:08 at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548) > Sep 07 11:25:08 > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29203) Support optimization of Union(all, Values, Values) to Values
[ https://issues.apache.org/jira/browse/FLINK-29203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-29203: - Parent: (was: FLINK-20873) Issue Type: Improvement (was: Sub-task) > Support optimization of Union(all, Values, Values) to Values > - > > Key: FLINK-29203 > URL: https://issues.apache.org/jira/browse/FLINK-29203 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API, Table SQL / Planner >Reporter: Sergey Nuyanzin >Priority: Major > > This optimization was introduced at > https://issues.apache.org/jira/browse/CALCITE-4383 > There are several issues with that > 1. now union all tries to do casting to least restrictive type [1] as a > result SetOperatorsITCase#testUnionAllWithCommonType fails like below > 2. JoinITCase#testUncorrelatedScalar fails like mentioned at > https://issues.apache.org/jira/browse/FLINK-29204 > 3. org.apache.calcite.plan.hep.HepPlanner#findBestExp could be empty for > LogicalValues after such optimization > {noformat} > org.apache.flink.table.planner.codegen.CodeGenException: Incompatible types > of expression and result type. Expression[GeneratedExpression(((int) > 12),false,,INT NOT NULL,Some(12))] type is [INT NOT NULL], result type is > [DECIMAL(13, 3) NOT NULL] > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateResultExpression$2(ExprCodeGenerator.scala:309) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateResultExpression$2$adapted(ExprCodeGenerator.scala:293) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateResultExpression(ExprCodeGenerator.scala:293) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateResultExpression(ExprCodeGenerator.scala:247) > at > org.apache.flink.table.planner.codegen.ValuesCodeGenerator$.$anonfun$generatorInputFormat$1(ValuesCodeGenerator.scala:45) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.codegen.ValuesCodeGenerator$.generatorInputFormat(ValuesCodeGenerator.scala:43) > at > org.apache.flink.table.planner.codegen.ValuesCodeGenerator.generatorInputFormat(ValuesCodeGenerator.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecValues.translateToPlanInternal(CommonExecValues.java:66) > at > org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues.translateToPlanInternal(BatchExecValues.java:57) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257) > at > org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:65) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158) > at > org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:93) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at >