Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
gyfora commented on code in PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1452023246 ## flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java: ## @@ -17,55 +17,82 @@ package org.apache.flink.kubernetes.operator.admission.mutator; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.webhook.admission.NotAllowedException; import io.javaoperatorsdk.webhook.admission.Operation; import io.javaoperatorsdk.webhook.admission.mutation.Mutator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; +import java.util.Optional; +import java.util.Set; /** The default mutator. */ public class FlinkMutator implements Mutator { private static final Logger LOG = LoggerFactory.getLogger(FlinkMutator.class); private static final ObjectMapper mapper = new ObjectMapper(); +private final Set mutators; +private final InformerManager informerManager; + +public FlinkMutator(Set mutators, InformerManager informerManager) { +this.mutators = mutators; +this.informerManager = informerManager; +} @Override public HasMetadata mutate(HasMetadata resource, Operation operation) throws NotAllowedException { -if (operation == Operation.CREATE) { +if (operation == Operation.CREATE || operation == Operation.UPDATE) { LOG.debug("Mutating resource {}", resource); - if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) { -try { -var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); -setSessionTargetLabel(sessionJob); -return sessionJob; -} catch (Exception e) { -throw new RuntimeException(e); -} +return mutateSessionJob(resource); +} +if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) { +return mutateDeployment(resource); } } return resource; } -private void setSessionTargetLabel(FlinkSessionJob flinkSessionJob) { -var labels = flinkSessionJob.getMetadata().getLabels(); -if (labels == null) { -labels = new HashMap<>(); +private FlinkSessionJob mutateSessionJob(HasMetadata resource) { +try { +var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); +var namespace = sessionJob.getMetadata().getNamespace(); +var deploymentName = sessionJob.getSpec().getDeploymentName(); +var key = Cache.namespaceKeyFunc(namespace, deploymentName); +var deployment = + informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key); + +for (FlinkResourceMutator mutator : mutators) { +FlinkSessionJob flinkSessionJob = +mutator.mutateSessionJob(sessionJob, Optional.ofNullable(deployment)); +sessionJob = flinkSessionJob; Review Comment: Simplify to : ``` sessionJob = mutator.mutateSessionJob(sessionJob, Optional.ofNullable(deployment)); ``` ## flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java: ## @@ -17,55 +17,82 @@ package org.apache.flink.kubernetes.operator.admission.mutator; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.webhook.admission.NotAllowedException; import io.javaoperatorsdk.webhook.admission.Operation; import io.javaoperatorsdk.webhook.admission.mutation.Mutator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; +import java.util.Optional; +import java.util.Set; /** The default mutator. */ public class FlinkMutator implements Mutator { private static final Logger LOG =
[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create
[ https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Khanh Vu updated FLINK-34076: - Description: The following issue encounters with flink-kinesis-connector v4.2.0, Flink 1.17, it's working properly with kinesis connector v4.1.0 (I have not tested version pre v4.1.0). The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in runtime. E.g. with following depenency only in pom.xml {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} {code} and a minimal job definition: {code:java} public static void main(String[] args) throws Exception { // create data stream environment StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv); Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build(); TableDescriptor descriptor = TableDescriptor.forConnector("kinesis") .schema(a) .format("json") .build(); tEnv.createTemporaryTable("sinkTable", descriptor); tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString()).print(); } {code} following exception will be thrown: {code:java} Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?] at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] ... 28 more {code} The fix is to explicitly specify `flink-connector-base` as dependency of the project: {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} org.apache.flink flink-connector-base ${flink.version} provided {code} In general, `flink-connector-base` should be pulled in by default when pulling in the kinesis connector, the current separation adds unnecessary hassle to use the connector. was: The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in runtime. E.g. with following depenency only in pom.xml {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} {code} and a minimal job definition: {code:java} public static void main(String[] args) throws Exception { // create data stream environment StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv); Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build(); TableDescriptor descriptor = TableDescriptor.forConnector("kinesis") .schema(a) .format("json") .build(); tEnv.createTemporaryTable("sinkTable", descriptor); tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString()).print(); } {code} following exception will be thrown: {code:java} Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?] at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] ... 28 more {code} The fix is to
[jira] [Commented] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create
[ https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806669#comment-17806669 ] Khanh Vu commented on FLINK-34076: -- Hi [~jiabao.sun], Yes, if I have `flink-connector-base` in the dependency list, it will run properly (it's the fix I mentioned), but if I leave the base out, it's failing. Before the aforementioned commit, I just need to have `flink-connector-kinesis` in the list (not along with the `flink-connector-base`). It's actually not correct when the connector depends on `flink-connector-base` to execute but does not have it as (transitive) dependency. > flink-connector-base missing fails kinesis table sink to create > --- > > Key: FLINK-34076 > URL: https://issues.apache.org/jira/browse/FLINK-34076 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: aws-connector-4.2.0 >Reporter: Khanh Vu >Priority: Major > Attachments: screenshot-1.png > > > The > [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] > which stops bundling `flink-connector-base` with `flink-connector-kinesis` > has caused kinesis sink failing to create when using Table API as required > classes from `flink-connector-base` are not loaded in runtime. > E.g. with following depenency only in pom.xml > {code:java} > > org.apache.flink > flink-connector-kinesis > ${flink.connector.kinesis.version} > > {code} > and a minimal job definition: > {code:java} > public static void main(String[] args) throws Exception { > // create data stream environment > StreamExecutionEnvironment sEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); > StreamTableEnvironment tEnv = > StreamTableEnvironment.create(sEnv); > Schema a = Schema.newBuilder().column("a", > DataTypes.STRING()).build(); > TableDescriptor descriptor = > TableDescriptor.forConnector("kinesis") > .schema(a) > .format("json") > .build(); > tEnv.createTemporaryTable("sinkTable", descriptor); > tEnv.executeSql("CREATE TABLE sinkTable " + > descriptor.toString()).print(); > } > {code} > following exception will be thrown: > {code:java} > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory > at > jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > ~[?:?] > at > jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > ~[?:?] > at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] > ... 28 more > {code} > The fix is to explicitly specify `flink-connector-base` as dependency of the > project: > {code:java} > > org.apache.flink > flink-connector-kinesis > ${flink.connector.kinesis.version} > > > org.apache.flink > flink-connector-base > ${flink.version} > provided > > {code} > In general, `flink-connector-base` should be pulled in by default when > pulling in the kinesis connector, the current separation adds unnecessary > hassle to use the connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33268][rest] Skip unknown fields in REST response deserialization [flink]
gyfora commented on PR #23930: URL: https://github.com/apache/flink/pull/23930#issuecomment-1891493012 Looks good @gaborgsomogyi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33998) Flink Job Manager restarted after kube-apiserver connection intermittent
[ https://issues.apache.org/jira/browse/FLINK-33998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806664#comment-17806664 ] Matthias Pohl commented on FLINK-33998: --- I couldn't find anything that sounds related to your issue in the release notes of [Flink 1.14.0|https://nightlies.apache.org/flink/flink-docs-release-1.18/release-notes/flink-1.14/#runtime--coordination]. A more detailed overview of the changes is possible by browsing through all the changes of the [individual 1.14.x releases|https://issues.apache.org/jira/projects/FLINK?selectedItem=com.atlassian.jira.jira-projects-plugin:release-page=released=1.14]. But that's quite tedious. > Flink Job Manager restarted after kube-apiserver connection intermittent > > > Key: FLINK-33998 > URL: https://issues.apache.org/jira/browse/FLINK-33998 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.13.6 > Environment: Kubernetes 1.24 > Flink Operator 1.4 > Flink 1.13.6 >Reporter: Xiangyan >Priority: Major > Attachments: audit-log-no-restart.txt, audit-log-restart.txt, > connection timeout.png, jm-no-restart4.log, jm-restart4.log > > > We are running Flink on AWS EKS and experienced Job Manager restart issue > when EKS control plane scaled up/in. > I can reproduce this issue in my local environment too. > Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster > by my own with below setup: > * Two kube-apiserver, only one is running at a time; > * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13); > * Enable Flink Job Manager HA; > * Configure Job Manager leader election timeout; > {code:java} > high-availability.kubernetes.leader-election.lease-duration: "60s" > high-availability.kubernetes.leader-election.renew-deadline: "60s"{code} > For testing, I switch the running kube-apiserver from one instance to another > each time. When the kube-apiserver is switching, I can see that some Job > Managers restart, but some are still running normally. > Here is an example. When kube-apiserver swatched over at > 05:{color:#ff}{{*53*}}{color}:08, both JM lost connection to > kube-apiserver. But there is no more connection error within a few seconds. I > guess the connection recovered by retry. > However, one of the JM (the 2nd one in the attached screen shot) reported > "DefaultDispatcherRunner was revoked the leadership" error after the leader > election timeout (at 05:{color:#ff}{{*54*}}{color}:08) and then restarted > itself. While the other JM was still running normally. > From kube-apiserver audit logs, the normal JM was able to renew leader lease > after the interruption. But there is no any lease renew request from the > failed JM until it restarted. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1452012771 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java: ## @@ -69,10 +69,10 @@ default void notifyPriorityEvent(int priorityBufferNumber) {} * Get the availability and backlog of the view. The availability represents if the view is * ready to get buffer from it. The backlog represents the number of available data buffers. * - * @param numCreditsAvailable the available credits for this {@link ResultSubpartitionView}. + * @param isCreditAvailable the availability of credits for this {@link ResultSubpartitionView}. * @return availability and backlog. */ -AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable); +AvailabilityWithBacklog getAvailabilityAndBacklog(boolean isCreditAvailable); Review Comment: This is a must-have change, because otherwise `UnionResultSubpartitionView` would need to determine how to distribute credits to child views, which is unnecessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
yunfengzhou-hub commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1452009484 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.util.Iterator; + +/** A collection of subpartition indexes. */ +public class ResultSubpartitionIndexSet extends IndexRange { Review Comment: In future we may need to dynamically assign subpartitions to an input channel during runtime, in which case the index of the subpartitions may not be adjacent to each other. The IndexSet naming could offer a abstraction that is general enough for these future uses. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-34078) Move InternalKeyContext classes from o.a.f.runtime.state.heap to o.a.f.runtime.state package
[ https://issues.apache.org/jira/browse/FLINK-34078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu reassigned FLINK-34078: Assignee: Jinzhong Li > Move InternalKeyContext classes from o.a.f.runtime.state.heap to > o.a.f.runtime.state package > > > Key: FLINK-34078 > URL: https://issues.apache.org/jira/browse/FLINK-34078 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Jinzhong Li >Assignee: Jinzhong Li >Priority: Minor > Attachments: image-2024-01-15-12-57-12-667.png > > > h3. Motication: > When Rocksdb statebackend throws a keyGroup check illegal exception, > the exception stack contains the heap stateBackend scoped class, which looks > so strange to user. > !image-2024-01-15-12-57-12-667.png|width=555,height=68! > h3. Proposed changes: > InternalKeyContext and InternalKeyContextImpl are commonly used by all state > backends (heap/rocksdb/changelog), they should be moved from > org.apache.flink.runtime.state.heap package to org.apache.flink.runtime.state > package. > h3. Compatibility: > InternalKeyContext is annotated with @Internal, so this change has no > compatibility issues. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34081) Refactor all callers that using the public Xxx getXxx(ConfigOption configOption) and public void setXxx(ConfigOption key, Xxx value)
[ https://issues.apache.org/jira/browse/FLINK-34081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-34081: Description: FLINK-34080 deprecate some methods of Configuration, we should refactor all callers of deprecated methods to use the recommended method. > Refactor all callers that using the public Xxx getXxx(ConfigOption > configOption) and public void setXxx(ConfigOption key, Xxx value) > > > Key: FLINK-34081 > URL: https://issues.apache.org/jira/browse/FLINK-34081 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > FLINK-34080 deprecate some methods of Configuration, we should refactor all > callers of deprecated methods to use the recommended method. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34078) Move InternalKeyContext classes from o.a.f.runtime.state.heap to o.a.f.runtime.state package
[ https://issues.apache.org/jira/browse/FLINK-34078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806660#comment-17806660 ] Hangxiang Yu commented on FLINK-34078: -- Thanks for reporting this. It makes sense to move it to the outer package. Already assigned to you, please go ahead. > Move InternalKeyContext classes from o.a.f.runtime.state.heap to > o.a.f.runtime.state package > > > Key: FLINK-34078 > URL: https://issues.apache.org/jira/browse/FLINK-34078 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Jinzhong Li >Priority: Minor > Attachments: image-2024-01-15-12-57-12-667.png > > > h3. Motication: > When Rocksdb statebackend throws a keyGroup check illegal exception, > the exception stack contains the heap stateBackend scoped class, which looks > so strange to user. > !image-2024-01-15-12-57-12-667.png|width=555,height=68! > h3. Proposed changes: > InternalKeyContext and InternalKeyContextImpl are commonly used by all state > backends (heap/rocksdb/changelog), they should be moved from > org.apache.flink.runtime.state.heap package to org.apache.flink.runtime.state > package. > h3. Compatibility: > InternalKeyContext is annotated with @Internal, so this change has no > compatibility issues. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34083) Deprecate string configuration keys and unused constants in ConfigConstants
[ https://issues.apache.org/jira/browse/FLINK-34083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan reassigned FLINK-34083: --- Assignee: Xuannan Su > Deprecate string configuration keys and unused constants in ConfigConstants > --- > > Key: FLINK-34083 > URL: https://issues.apache.org/jira/browse/FLINK-34083 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Reporter: Xuannan Su >Assignee: Xuannan Su >Priority: Major > Fix For: 1.19.0 > > > * Update ConfigConstants.java to deprecate and replace string configuration > keys > * Mark unused constants in ConfigConstants.java as deprecated -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34084) Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat
[ https://issues.apache.org/jira/browse/FLINK-34084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan reassigned FLINK-34084: --- Assignee: Xuannan Su > Deprecate unused configuration in BinaryInput/OutputFormat and > FileInput/OutputFormat > - > > Key: FLINK-34084 > URL: https://issues.apache.org/jira/browse/FLINK-34084 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Reporter: Xuannan Su >Assignee: Xuannan Su >Priority: Major > Fix For: 1.19.0 > > > Update FileInputFormat.java, FileOutputFormat.java, BinaryInputFormat.java, > and BinaryOutputFormat.java to deprecate unused string configuration keys. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange
[ https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu reassigned FLINK-34050: Assignee: Jinzhong Li > Rocksdb state has space amplification after rescaling with DeleteRange > -- > > Key: FLINK-34050 > URL: https://issues.apache.org/jira/browse/FLINK-34050 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Jinzhong Li >Assignee: Jinzhong Li >Priority: Major > Attachments: image-2024-01-10-21-23-48-134.png, > image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png > > > FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will > cause space amplification in some case. > We can reproduce this problem using wordCount job: > 1) before rescaling, state operator in wordCount job has 2 parallelism and > 4G+ full checkpoint size; > !image-2024-01-10-21-24-10-983.png|width=266,height=130! > 2) then restart job with 4 parallelism (for state operator), the full > checkpoint size of new job will be 8G+ ; > 3) after many successful checkpoints, the full checkpoint size is still 8G+; > !image-2024-01-10-21-28-24-312.png|width=454,height=111! > > The root cause of this issue is that the deleted keyGroupRange does not > overlap with current DB keyGroupRange, so new data written into rocksdb after > rescaling almost never do LSM compaction with the deleted data (belonging to > other keyGroupRange.) > > And the space amplification may affect Rocksdb read performance and disk > space usage after rescaling. It looks like a regression due to the > introduction of deleteRange for rescaling optimization. > > To slove this problem, I think maybe we can invoke > Rocksdb.deleteFilesInRanges after deleteRange? > {code:java} > public static void clipDBWithKeyGroupRange() { > //... > List ranges = new ArrayList<>(); > //... > deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes); > ranges.add(beginKeyGroupBytes); > ranges.add(endKeyGroupBytes); > // > for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { > db.deleteFilesInRanges(columnFamilyHandle, ranges, false); > } > } > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34085) Remove deprecated string configuration keys in Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-34085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan reassigned FLINK-34085: --- Assignee: Xuannan Su > Remove deprecated string configuration keys in Flink 2.0 > > > Key: FLINK-34085 > URL: https://issues.apache.org/jira/browse/FLINK-34085 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Reporter: Xuannan Su >Assignee: Xuannan Su >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange
[ https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806659#comment-17806659 ] Hangxiang Yu commented on FLINK-34050: -- [~lijinzhong] Yeah, I think this benchmark result should be enough. > Rocksdb state has space amplification after rescaling with DeleteRange > -- > > Key: FLINK-34050 > URL: https://issues.apache.org/jira/browse/FLINK-34050 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Jinzhong Li >Priority: Major > Attachments: image-2024-01-10-21-23-48-134.png, > image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png > > > FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will > cause space amplification in some case. > We can reproduce this problem using wordCount job: > 1) before rescaling, state operator in wordCount job has 2 parallelism and > 4G+ full checkpoint size; > !image-2024-01-10-21-24-10-983.png|width=266,height=130! > 2) then restart job with 4 parallelism (for state operator), the full > checkpoint size of new job will be 8G+ ; > 3) after many successful checkpoints, the full checkpoint size is still 8G+; > !image-2024-01-10-21-28-24-312.png|width=454,height=111! > > The root cause of this issue is that the deleted keyGroupRange does not > overlap with current DB keyGroupRange, so new data written into rocksdb after > rescaling almost never do LSM compaction with the deleted data (belonging to > other keyGroupRange.) > > And the space amplification may affect Rocksdb read performance and disk > space usage after rescaling. It looks like a regression due to the > introduction of deleteRange for rescaling optimization. > > To slove this problem, I think maybe we can invoke > Rocksdb.deleteFilesInRanges after deleteRange? > {code:java} > public static void clipDBWithKeyGroupRange() { > //... > List ranges = new ArrayList<>(); > //... > deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes); > ranges.add(beginKeyGroupBytes); > ranges.add(endKeyGroupBytes); > // > for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { > db.deleteFilesInRanges(columnFamilyHandle, ranges, false); > } > } > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-24024][table-planner] support session window tvf in plan [flink]
LadyForest commented on code in PR #23505: URL: https://github.com/apache/flink/pull/23505#discussion_r1448461452 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala: ## @@ -234,11 +238,71 @@ object WindowUtil { val step = getOperandAsLong(windowCall.operands(1)) val maxSize = getOperandAsLong(windowCall.operands(2)) new CumulativeWindowSpec(Duration.ofMillis(maxSize), Duration.ofMillis(step), offset) + case FlinkSqlOperatorTable.SESSION => +val gap = getOperandAsLong(windowCall.operands(1)) +val partitionKeys = + exploreSessionWindowPartitionKeys(scanInput) +new SessionWindowSpec(Duration.ofMillis(gap), partitionKeys) } new TimeAttributeWindowingStrategy(windowSpec, timeAttributeType, timeIndex) } + /** + * If the session window tvf has partition keys, the whole tree is like: + * + * {{{ Review Comment: Nit ```scala {{{ * TableFunctionScan * | * [Project or Calc] * | * Exchange * }}} ``` ## flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java: ## @@ -1432,9 +1433,15 @@ private void substituteSubQuery(Blackboard bb, SubQuery subQuery) { bb.cursors.add(converted.r); return; case SET_SEMANTICS_TABLE: -if (!config.isExpand()) { -return; -} +// - FLINK MODIFICATION BEGIN - +// Currently, Flink will not distinguish tvf between SET semantics and ROW +// semantics. +// And in Flink, only session window tvf need to support SET semantics. It will +// always be expanded. +// if (!config.isExpand()) { +// return; +// } Review Comment: Can we defer the sub-query rewrite to the physical phase? ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java: ## @@ -144,7 +145,8 @@ public RelNode visit(RelNode node) { || node instanceof FlinkLogicalSort || node instanceof FlinkLogicalOverAggregate || node instanceof FlinkLogicalExpand -|| node instanceof FlinkLogicalScriptTransform) { +|| node instanceof FlinkLogicalScriptTransform +|| node instanceof FlinkLogicalExchange) { Review Comment: I think we don't need to involve `FlinkLogicalExchange` here if we can defer the sub-query rewrite to the physical phase ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java: ## @@ -249,4 +263,53 @@ private GeneratedNamespaceAggsHandleFunction createAggsHandler( sliceAssigner, shiftTimeZone); } + +/** + * Currently, the operator of WindowAggregate does not support Session Window and it needs to + * fall back to the legacy GroupWindowAggregate. + */ +private boolean shouldFallbackToGroupWindowAgg(WindowSpec windowSpec) { +return windowSpec instanceof SessionWindowSpec; +} + +private Transformation fallbackToGroupWindowAggregate( +PlannerBase planner, ExecNodeConfig config) { +Preconditions.checkState(windowing.getWindow() instanceof SessionWindowSpec); + +if (windowing instanceof TimeAttributeWindowingStrategy) { +LogicalType timeAttributeType = windowing.getTimeAttributeType(); +LogicalWindow logicalWindow = +new SessionGroupWindow( +new WindowReference("w$", timeAttributeType), +new FieldReferenceExpression( +// mock an empty time field name here +"", + fromLogicalTypeToDataType(timeAttributeType), +0, +((TimeAttributeWindowingStrategy) windowing) +.getTimeAttributeIndex()), +intervalOfMillis( +((SessionWindowSpec) windowing.getWindow()) +.getGap() +.toMillis())); + +StreamExecGroupWindowAggregate groupWindowAggregate = +new StreamExecGroupWindowAggregate( +planner.getTableConfig(), +grouping, +aggCalls, +logicalWindow, +
[jira] [Created] (FLINK-34085) Remove deprecated string configuration keys in Flink 2.0
Xuannan Su created FLINK-34085: -- Summary: Remove deprecated string configuration keys in Flink 2.0 Key: FLINK-34085 URL: https://issues.apache.org/jira/browse/FLINK-34085 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Xuannan Su Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34084) Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat
Xuannan Su created FLINK-34084: -- Summary: Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat Key: FLINK-34084 URL: https://issues.apache.org/jira/browse/FLINK-34084 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Xuannan Su Fix For: 1.19.0 Update FileInputFormat.java, FileOutputFormat.java, BinaryInputFormat.java, and BinaryOutputFormat.java to deprecate unused string configuration keys. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]
xishuaidelin commented on code in PR #23827: URL: https://github.com/apache/flink/pull/23827#discussion_r1451985731 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java: ## @@ -128,6 +137,15 @@ public void merge(Accumulator acc, Iterable others) throws Exceptio assertKeyNotPresent(acc, key); acc.map.put(key, other.map.get(key)); } +for (final StringData key : other.retractMap.keys()) { Review Comment: Hi xuyang, thanks for your comments. It considers both the key and the corresponding value in the comparison. Therefore, I don't foresee any issues arising during the merge stage. However, the issue you mentioned could potentially occur at the local stage, such as within the retract function. I would fix it in retract function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34083) Deprecate string configuration keys and unused constants in ConfigConstants
Xuannan Su created FLINK-34083: -- Summary: Deprecate string configuration keys and unused constants in ConfigConstants Key: FLINK-34083 URL: https://issues.apache.org/jira/browse/FLINK-34083 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Xuannan Su Fix For: 1.19.0 * Update ConfigConstants.java to deprecate and replace string configuration keys * Mark unused constants in ConfigConstants.java as deprecated -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel [flink]
reswqa commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1450161095 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java: ## @@ -291,13 +291,18 @@ public void releaseAllResources() throws IOException { } @Override -public void notifyDataAvailable() { +public void notifyDataAvailable(ResultSubpartitionView view) { requestQueue.notifyReaderNonEmpty(this); } @Override public void notifyPriorityEvent(int prioritySequenceNumber) { -notifyDataAvailable(); +notifyDataAvailable(this.subpartitionView); +} + +@VisibleForTesting +public void notifyDataAvailable() { +notifyDataAvailable(subpartitionView); Review Comment: `subpartitionView ` -> `this.subpartitionView ` to consistent with `notifyPriorityEvent`. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java: ## @@ -69,10 +69,10 @@ default void notifyPriorityEvent(int priorityBufferNumber) {} * Get the availability and backlog of the view. The availability represents if the view is * ready to get buffer from it. The backlog represents the number of available data buffers. * - * @param numCreditsAvailable the available credits for this {@link ResultSubpartitionView}. + * @param isCreditAvailable the availability of credits for this {@link ResultSubpartitionView}. * @return availability and backlog. */ -AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable); +AvailabilityWithBacklog getAvailabilityAndBacklog(boolean isCreditAvailable); Review Comment: Is this commit a must-have change, or just a refactor? ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionIndexSet.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.executiongraph.IndexRange; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import java.util.Iterator; + +/** A collection of subpartition indexes. */ +public class ResultSubpartitionIndexSet extends IndexRange { Review Comment: I wonder what is the difference between `IndexRange` and `IndexSet`. From my side, it's better to align with the name of the parent class. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java: ## @@ -291,13 +291,18 @@ public void releaseAllResources() throws IOException { } @Override -public void notifyDataAvailable() { +public void notifyDataAvailable(ResultSubpartitionView view) { requestQueue.notifyReaderNonEmpty(this); } @Override public void notifyPriorityEvent(int prioritySequenceNumber) { -notifyDataAvailable(); +notifyDataAvailable(this.subpartitionView); +} + +@VisibleForTesting Review Comment: Why this method is `@VisibleForTesting`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34080][configuration] Simplify the Configuration [flink]
flinkbot commented on PR #24088: URL: https://github.com/apache/flink/pull/24088#issuecomment-1891392365 ## CI report: * 39fc4c7369deceabffb7bc2496300e8a8061faba UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34080][configuration] Simplify the Configuration [flink]
1996fanrui opened a new pull request, #24088: URL: https://github.com/apache/flink/pull/24088 ## What is the purpose of the change See the part 2.2 of [FLIP-405](https://cwiki.apache.org/confluence/x/6Yr5E) ## Brief change log - [FLINK-34080][configuration] Add the `T get(ConfigOption configOption, T overrideDefault)` for Configuration - [FLINK-34080][configuration] Deprecate all getXxx and setXxx methods for Configuration - [FLINK-34080][configuration] Mark `setBytes` and `getBytes` of Configuration as `@Internal` - [FLINK-34080][configuration] Remove the `@Deprecated` for `getString(String key, String defaultValue)` of Configuration ## Verifying this change - Added `ConfigurationTest#testGetWithOverrideDefault` - Improved the `DelegatingConfigurationTest#testGetWithOverrideDefault` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34080) Simplify the Configuration
[ https://issues.apache.org/jira/browse/FLINK-34080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34080: --- Labels: pull-request-available (was: ) > Simplify the Configuration > -- > > Key: FLINK-34080 > URL: https://issues.apache.org/jira/browse/FLINK-34080 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Blocker > Labels: pull-request-available > Fix For: 1.19.0 > > > This Jira is 2.2 part of FLIP-405: > * 2.2.1 Update Configuration to encourage the usage of ConfigOption over > string configuration key > * 2.2.2 Introduce public T get(ConfigOption configOption, T > overrideDefault) > * 2.2.3 Deprecate some unnecessary setXxx and getXxx methods in Configuration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34077) Python Sphinx version error
[ https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingbo Huang updated FLINK-34077: - Issue Type: Technical Debt (was: Bug) > Python Sphinx version error > --- > > Key: FLINK-34077 > URL: https://issues.apache.org/jira/browse/FLINK-34077 > Project: Flink > Issue Type: Technical Debt > Components: API / Python >Affects Versions: 1.19.0 >Reporter: Yunfeng Zhou >Assignee: Xingbo Huang >Priority: Major > Labels: pull-request-available > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] > > {code:java} > Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d > _build/doctrees -a -W . _build/html > Jan 14 15:49:17 Running Sphinx v4.5.0 > Jan 14 15:49:17 > Jan 14 15:49:17 Sphinx version error: > Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project > needs at least Sphinx v5.0; it therefore cannot be built with this version. > > Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed > Jan 14 15:49:17 make: *** [html] Error 2 > Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34077) Python Sphinx version error
[ https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingbo Huang updated FLINK-34077: - Summary: Python Sphinx version error (was: Sphinx version needs upgrade) > Python Sphinx version error > --- > > Key: FLINK-34077 > URL: https://issues.apache.org/jira/browse/FLINK-34077 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.19.0 >Reporter: Yunfeng Zhou >Assignee: Xingbo Huang >Priority: Major > Labels: pull-request-available > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] > > {code:java} > Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d > _build/doctrees -a -W . _build/html > Jan 14 15:49:17 Running Sphinx v4.5.0 > Jan 14 15:49:17 > Jan 14 15:49:17 Sphinx version error: > Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project > needs at least Sphinx v5.0; it therefore cannot be built with this version. > > Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed > Jan 14 15:49:17 make: *** [html] Error 2 > Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34077) Sphinx version needs upgrade
[ https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingbo Huang resolved FLINK-34077. -- Resolution: Fixed Merged into master via d2fbe464b1a353a7eb35926299d5c048647a3073 > Sphinx version needs upgrade > > > Key: FLINK-34077 > URL: https://issues.apache.org/jira/browse/FLINK-34077 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.19.0 >Reporter: Yunfeng Zhou >Assignee: Xingbo Huang >Priority: Major > Labels: pull-request-available > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] > > {code:java} > Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d > _build/doctrees -a -W . _build/html > Jan 14 15:49:17 Running Sphinx v4.5.0 > Jan 14 15:49:17 > Jan 14 15:49:17 Sphinx version error: > Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project > needs at least Sphinx v5.0; it therefore cannot be built with this version. > > Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed > Jan 14 15:49:17 make: *** [html] Error 2 > Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34077][python] Limits some sphinxcontrib packages upper bounds [flink]
HuangXingBo merged PR #24086: URL: https://github.com/apache/flink/pull/24086 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806641#comment-17806641 ] Xintong Song commented on FLINK-33728: -- Sure, thanks for volunteering working on this. I've assigned you to the ticket. Please go ahead. > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Assignee: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song reassigned FLINK-33728: Assignee: xiaogang zhou > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Assignee: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34077][python] Limits some sphinxcontrib packages upper bounds [flink]
HuangXingBo commented on PR #24086: URL: https://github.com/apache/flink/pull/24086#issuecomment-1891366044 https://dev.azure.com/hxbks2ks/FLINK-TEST/_build/results?buildId=2214=logs=fba17979-6d2e-591d-72f1-97cf42797c11 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34080) Simplify the Configuration
[ https://issues.apache.org/jira/browse/FLINK-34080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-34080: Priority: Blocker (was: Major) > Simplify the Configuration > -- > > Key: FLINK-34080 > URL: https://issues.apache.org/jira/browse/FLINK-34080 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Blocker > Fix For: 1.19.0 > > > This Jira is 2.2 part of FLIP-405: > * 2.2.1 Update Configuration to encourage the usage of ConfigOption over > string configuration key > * 2.2.2 Introduce public T get(ConfigOption configOption, T > overrideDefault) > * 2.2.3 Deprecate some unnecessary setXxx and getXxx methods in Configuration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34082) Remove deprecated methods of Configuration in 2.0
Rui Fan created FLINK-34082: --- Summary: Remove deprecated methods of Configuration in 2.0 Key: FLINK-34082 URL: https://issues.apache.org/jira/browse/FLINK-34082 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Rui Fan Assignee: Rui Fan Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34081) Refactor all callers that using the public Xxx getXxx(ConfigOption configOption) and public void setXxx(ConfigOption key, Xxx value)
Rui Fan created FLINK-34081: --- Summary: Refactor all callers that using the public Xxx getXxx(ConfigOption configOption) and public void setXxx(ConfigOption key, Xxx value) Key: FLINK-34081 URL: https://issues.apache.org/jira/browse/FLINK-34081 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Rui Fan Assignee: Rui Fan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34080) Simplify the Configuration
Rui Fan created FLINK-34080: --- Summary: Simplify the Configuration Key: FLINK-34080 URL: https://issues.apache.org/jira/browse/FLINK-34080 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Rui Fan Assignee: Rui Fan Fix For: 1.19.0 This Jira is 2.2 part of FLIP-405: * 2.2.1 Update Configuration to encourage the usage of ConfigOption over string configuration key * 2.2.2 Introduce public T get(ConfigOption configOption, T overrideDefault) * 2.2.3 Deprecate some unnecessary setXxx and getXxx methods in Configuration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806640#comment-17806640 ] xiaogang zhou commented on FLINK-33728: --- [~xtsong] [~wangyang0918] Ok, glad to hear that. Would you please help assign the ticket to me? > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34079) FLIP-405: Migrate string configuration key to ConfigOption
Rui Fan created FLINK-34079: --- Summary: FLIP-405: Migrate string configuration key to ConfigOption Key: FLINK-34079 URL: https://issues.apache.org/jira/browse/FLINK-34079 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Reporter: Rui Fan Assignee: Xuannan Su Fix For: 1.19.0 This is an umbrella Jira of [FLIP-405|https://cwiki.apache.org/confluence/x/6Yr5E] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34078) Move InternalKeyContext classes from o.a.f.runtime.state.heap to o.a.f.runtime.state package
Jinzhong Li created FLINK-34078: --- Summary: Move InternalKeyContext classes from o.a.f.runtime.state.heap to o.a.f.runtime.state package Key: FLINK-34078 URL: https://issues.apache.org/jira/browse/FLINK-34078 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Jinzhong Li Attachments: image-2024-01-15-12-57-12-667.png h3. Motication: When Rocksdb statebackend throws a keyGroup check illegal exception, the exception stack contains the heap stateBackend scoped class, which looks so strange to user. !image-2024-01-15-12-57-12-667.png|width=555,height=68! h3. Proposed changes: InternalKeyContext and InternalKeyContextImpl are commonly used by all state backends (heap/rocksdb/changelog), they should be moved from org.apache.flink.runtime.state.heap package to org.apache.flink.runtime.state package. h3. Compatibility: InternalKeyContext is annotated with @Internal, so this change has no compatibility issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33576][core] Introduce new Flink conf file 'config.yaml' supporting standard YAML syntax. [flink]
JunRuiLee commented on PR #23852: URL: https://github.com/apache/flink/pull/23852#issuecomment-1891254763 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32978) Deprecate RichFunction#open(Configuration parameters)
[ https://issues.apache.org/jira/browse/FLINK-32978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-32978. Resolution: Fixed Breaking changes reverted. master (1.19): 1d6150f386d9c9ec61f4ab30853b915de7712047 > Deprecate RichFunction#open(Configuration parameters) > - > > Key: FLINK-32978 > URL: https://issues.apache.org/jira/browse/FLINK-32978 > Project: Flink > Issue Type: Technical Debt > Components: API / Core >Affects Versions: 1.19.0 >Reporter: Wencong Liu >Assignee: Wencong Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > The > [FLIP-344|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231] > has decided that the parameter in RichFunction#open will be removed in the > next major version. We should deprecate it now and remove it in Flink 2.0. > The removal will be tracked in > [FLINK-6912|https://issues.apache.org/jira/browse/FLINK-6912]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] Revert the breaking change to the public implementations of RichFunction [flink]
xintongsong closed pull request #24067: [hotfix] Revert the breaking change to the public implementations of RichFunction URL: https://github.com/apache/flink/pull/24067 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
[ https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806608#comment-17806608 ] Junrui Li commented on FLINK-27756: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56300=logs=2e8cb2f7-b2d3-5c62-9c05-cd756d33a819=2dd510a3-5041-5201-6dc3-54d310f68906 > Fix intermittently failing test in > AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds > -- > > Key: FLINK-27756 > URL: https://issues.apache.org/jira/browse/FLINK-27756 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.17.0, 1.19.0 >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.19.0 > > > h2. Motivation > - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of > {{AsyncSinkWriterTest}} has been reported to fail intermittently on build > pipeline causing blocking of new changes. > - Reporting build is [linked > |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]
JunRuiLee commented on code in PR #24025: URL: https://github.com/apache/flink/pull/24025#discussion_r1451894290 ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java: ## @@ -792,7 +792,11 @@ public CompletableFuture disposeSavepoint(String savepointPath, Tim try { Checkpoints.disposeSavepoint( -savepointPath, configuration, classLoader, log); +savepointPath, +new Configuration(), Review Comment: Because disposeSavepoint is a cluster-wide operation, so it does not require job-specific configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]
JunRuiLee commented on code in PR #24025: URL: https://github.com/apache/flink/pull/24025#discussion_r1451892446 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java: ## @@ -82,7 +83,8 @@ public void setUp() { @Test public void testImmediateCheckpointing() throws Exception { env.setRestartStrategy(RestartStrategies.noRestart()); -env.enableCheckpointing(Long.MAX_VALUE - 1); +env.enableCheckpointing( +Duration.ofNanos(Long.MAX_VALUE /* max allowed by FLINK */).toMillis()); Review Comment: 1. The previous use of Long.MAX_VALUE - 1 as the checkpoint interval was intentional because using Long.MAX_VALUE itself would effectively disable checkpointing, which isn't the desired behavior. The intention was to enable checkpointing with an interval so large that it effectively prevents any checkpoint from being triggered during the test run. The use of Duration.ofNanos(Long.MAX_VALUE).toMillis() aligns well with this scenario. 2. I will review all instances where a large checkpoint interval is used and add comments to clarify the reason using these intervals. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create
[ https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806605#comment-17806605 ] Jiabao Sun commented on FLINK-34076: Hi [~khanhvu], do you add dependencies with "provided" scope to classpath? I can run correctly locally. {code:xml} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} org.apache.flink flink-connector-base ${flink.version} provided org.apache.flink flink-streaming-java ${flink.version} provided org.apache.flink flink-table-api-java-bridge ${flink.version} provided org.apache.flink flink-table-planner-loader ${flink.version} provided org.apache.flink flink-table-runtime ${flink.version} provided {code} > flink-connector-base missing fails kinesis table sink to create > --- > > Key: FLINK-34076 > URL: https://issues.apache.org/jira/browse/FLINK-34076 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: aws-connector-4.2.0 >Reporter: Khanh Vu >Priority: Major > Attachments: screenshot-1.png > > > The > [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] > which stops bundling `flink-connector-base` with `flink-connector-kinesis` > has caused kinesis sink failing to create when using Table API as required > classes from `flink-connector-base` are not loaded in runtime. > E.g. with following depenency only in pom.xml > {code:java} > > org.apache.flink > flink-connector-kinesis > ${flink.connector.kinesis.version} > > {code} > and a minimal job definition: > {code:java} > public static void main(String[] args) throws Exception { > // create data stream environment > StreamExecutionEnvironment sEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); > StreamTableEnvironment tEnv = > StreamTableEnvironment.create(sEnv); > Schema a = Schema.newBuilder().column("a", > DataTypes.STRING()).build(); > TableDescriptor descriptor = > TableDescriptor.forConnector("kinesis") > .schema(a) > .format("json") > .build(); > tEnv.createTemporaryTable("sinkTable", descriptor); > tEnv.executeSql("CREATE TABLE sinkTable " + > descriptor.toString()).print(); > } > {code} > following exception will be thrown: > {code:java} > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory > at > jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > ~[?:?] > at > jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > ~[?:?] > at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] > ... 28 more > {code} > The fix is to explicitly specify `flink-connector-base` as dependency of the > project: > {code:java} > > org.apache.flink > flink-connector-kinesis > ${flink.connector.kinesis.version} > > > org.apache.flink > flink-connector-base > ${flink.version} > provided > > {code} > In general, `flink-connector-base` should be pulled in by default when > pulling in the kinesis connector, the current separation adds unnecessary > hassle to use the connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33792) Generate the same code for the same logic
[ https://issues.apache.org/jira/browse/FLINK-33792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benchao Li resolved FLINK-33792. Fix Version/s: 1.19.0 Resolution: Fixed Implemented via d26c1b668b7febc60aab1e4174f568958cd615d3 (1.19.0) [~zoudan] Thanks for your work! And also thanks [~lsy] for the review! > Generate the same code for the same logic > - > > Key: FLINK-33792 > URL: https://issues.apache.org/jira/browse/FLINK-33792 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Dan Zou >Assignee: Dan Zou >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Generate the same code for the same logic, so that we may reuse the generated > code between different jobs. This is the precondition for FLINK-28691. The > current issue is we use a self-incrementing counter in CodeGenUtils#newName, > it means we could not get the same generated class between two queries even > when they are exactly the same. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33768] Support dynamic source parallelism inference for batch jobs [flink]
flinkbot commented on PR #24087: URL: https://github.com/apache/flink/pull/24087#issuecomment-1891229367 ## CI report: * e81e5b792e05f208b281274fde872a8ca0035253 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33792] Generate the same code for the same logic [flink]
libenchao closed pull request #23984: [FLINK-33792] Generate the same code for the same logic URL: https://github.com/apache/flink/pull/23984 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create
[ https://issues.apache.org/jira/browse/FLINK-34076 ] Jiabao Sun deleted comment on FLINK-34076: was (Author: jiabao.sun): Hi [~khanhvu], do you add dependencies with "provided" scope to classpath? !screenshot-1.png! > flink-connector-base missing fails kinesis table sink to create > --- > > Key: FLINK-34076 > URL: https://issues.apache.org/jira/browse/FLINK-34076 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: aws-connector-4.2.0 >Reporter: Khanh Vu >Priority: Major > Attachments: screenshot-1.png > > > The > [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] > which stops bundling `flink-connector-base` with `flink-connector-kinesis` > has caused kinesis sink failing to create when using Table API as required > classes from `flink-connector-base` are not loaded in runtime. > E.g. with following depenency only in pom.xml > {code:java} > > org.apache.flink > flink-connector-kinesis > ${flink.connector.kinesis.version} > > {code} > and a minimal job definition: > {code:java} > public static void main(String[] args) throws Exception { > // create data stream environment > StreamExecutionEnvironment sEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); > StreamTableEnvironment tEnv = > StreamTableEnvironment.create(sEnv); > Schema a = Schema.newBuilder().column("a", > DataTypes.STRING()).build(); > TableDescriptor descriptor = > TableDescriptor.forConnector("kinesis") > .schema(a) > .format("json") > .build(); > tEnv.createTemporaryTable("sinkTable", descriptor); > tEnv.executeSql("CREATE TABLE sinkTable " + > descriptor.toString()).print(); > } > {code} > following exception will be thrown: > {code:java} > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory > at > jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > ~[?:?] > at > jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > ~[?:?] > at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] > ... 28 more > {code} > The fix is to explicitly specify `flink-connector-base` as dependency of the > project: > {code:java} > > org.apache.flink > flink-connector-kinesis > ${flink.connector.kinesis.version} > > > org.apache.flink > flink-connector-base > ${flink.version} > provided > > {code} > In general, `flink-connector-base` should be pulled in by default when > pulling in the kinesis connector, the current separation adds unnecessary > hassle to use the connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create
[ https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun updated FLINK-34076: --- Attachment: screenshot-1.png > flink-connector-base missing fails kinesis table sink to create > --- > > Key: FLINK-34076 > URL: https://issues.apache.org/jira/browse/FLINK-34076 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: aws-connector-4.2.0 >Reporter: Khanh Vu >Priority: Major > Attachments: screenshot-1.png > > > The > [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] > which stops bundling `flink-connector-base` with `flink-connector-kinesis` > has caused kinesis sink failing to create when using Table API as required > classes from `flink-connector-base` are not loaded in runtime. > E.g. with following depenency only in pom.xml > {code:java} > > org.apache.flink > flink-connector-kinesis > ${flink.connector.kinesis.version} > > {code} > and a minimal job definition: > {code:java} > public static void main(String[] args) throws Exception { > // create data stream environment > StreamExecutionEnvironment sEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); > StreamTableEnvironment tEnv = > StreamTableEnvironment.create(sEnv); > Schema a = Schema.newBuilder().column("a", > DataTypes.STRING()).build(); > TableDescriptor descriptor = > TableDescriptor.forConnector("kinesis") > .schema(a) > .format("json") > .build(); > tEnv.createTemporaryTable("sinkTable", descriptor); > tEnv.executeSql("CREATE TABLE sinkTable " + > descriptor.toString()).print(); > } > {code} > following exception will be thrown: > {code:java} > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory > at > jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > ~[?:?] > at > jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > ~[?:?] > at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] > ... 28 more > {code} > The fix is to explicitly specify `flink-connector-base` as dependency of the > project: > {code:java} > > org.apache.flink > flink-connector-kinesis > ${flink.connector.kinesis.version} > > > org.apache.flink > flink-connector-base > ${flink.version} > provided > > {code} > In general, `flink-connector-base` should be pulled in by default when > pulling in the kinesis connector, the current separation adds unnecessary > hassle to use the connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create
[ https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806603#comment-17806603 ] Jiabao Sun commented on FLINK-34076: Hi [~khanhvu], do you add dependencies with "provided" scope to classpath? !screenshot-1.png! > flink-connector-base missing fails kinesis table sink to create > --- > > Key: FLINK-34076 > URL: https://issues.apache.org/jira/browse/FLINK-34076 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: aws-connector-4.2.0 >Reporter: Khanh Vu >Priority: Major > Attachments: screenshot-1.png > > > The > [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] > which stops bundling `flink-connector-base` with `flink-connector-kinesis` > has caused kinesis sink failing to create when using Table API as required > classes from `flink-connector-base` are not loaded in runtime. > E.g. with following depenency only in pom.xml > {code:java} > > org.apache.flink > flink-connector-kinesis > ${flink.connector.kinesis.version} > > {code} > and a minimal job definition: > {code:java} > public static void main(String[] args) throws Exception { > // create data stream environment > StreamExecutionEnvironment sEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); > StreamTableEnvironment tEnv = > StreamTableEnvironment.create(sEnv); > Schema a = Schema.newBuilder().column("a", > DataTypes.STRING()).build(); > TableDescriptor descriptor = > TableDescriptor.forConnector("kinesis") > .schema(a) > .format("json") > .build(); > tEnv.createTemporaryTable("sinkTable", descriptor); > tEnv.executeSql("CREATE TABLE sinkTable " + > descriptor.toString()).print(); > } > {code} > following exception will be thrown: > {code:java} > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory > at > jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > ~[?:?] > at > jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > ~[?:?] > at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] > ... 28 more > {code} > The fix is to explicitly specify `flink-connector-base` as dependency of the > project: > {code:java} > > org.apache.flink > flink-connector-kinesis > ${flink.connector.kinesis.version} > > > org.apache.flink > flink-connector-base > ${flink.version} > provided > > {code} > In general, `flink-connector-base` should be pulled in by default when > pulling in the kinesis connector, the current separation adds unnecessary > hassle to use the connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33768] Support dynamic source parallelism inference for batch jobs [flink]
SinBex opened a new pull request, #24087: URL: https://github.com/apache/flink/pull/24087 ## What is the purpose of the change Currently, for JobVertices without parallelism configured, the AdaptiveBatchScheduler dynamically infers the vertex parallelism based on the volume of input data. Specifically, for Source vertices, it uses the value of `execution.batch.adaptive.auto-parallelism.default-source-parallelism` as the fixed parallelism. If this is not set by the user, the default value of 1 is used as the source parallelism, which is actually a temporary implementation solution. We aim to support dynamic source parallelism inference for batch jobs ## Brief change log - *Lazily initialize the parallelism of the OperatorCoordinator.* - *Add the `DynamicParallelismInference` and `DynamicFilteringInfo` interfaces, and enable the `SourceCoordinator` to invoke corresponding methods for dynamic parallelism inference.* - *The `AdaptiveBatchScheduler` applies dynamic source parallelism inference, and to avoid blocking the main thread by calling external systems, we have transformed the scheduling process to be asynchronous.* ## Verifying this change This change added tests and can be verified as follows: - *Added integration tests to verify the end-to-end logic of dynamic parallelism inference, see `AdaptiveBatchSchedulerITCase#testSchedulingWithDynamicSourceParallelismInference` for details.* - *Added unit tests for the newly added methods in classes such as `SourceCoordinator` and `AdaptiveBatchScheduler`.* - *Manually verified the feature on a Flink session cluster (1 JobManager, 77 TaskManagers), including dynamic inference of parallelism, asynchronous scheduling, and execution exceptions in `DynamicParallelismInference`, all performing as expected.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs / JavaDocs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34007) Flink Job stuck in suspend state after losing leadership in HA Mode
[ https://issues.apache.org/jira/browse/FLINK-34007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806600#comment-17806600 ] Yang Wang commented on FLINK-34007: --- Maybe I did not make myself clear. I mean the old leader JM should try to remove the annotation of HA ConfigMap {{control-plane.alpha.kubernetes.io/leader}} when lost leadership. From the fabric8 K8s client impl[1], {{isLeader}} callback will be executed only when the holder identity changed. [1]. [https://github.com/fabric8io/kubernetes-client/blob/v6.6.2/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L232] > Flink Job stuck in suspend state after losing leadership in HA Mode > --- > > Key: FLINK-34007 > URL: https://issues.apache.org/jira/browse/FLINK-34007 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.3, 1.17.2, 1.18.1, 1.18.2 >Reporter: Zhenqiu Huang >Priority: Major > Attachments: Debug.log, job-manager.log > > > The observation is that Job manager goes to suspend state with a failed > container not able to register itself to resource manager after timeout. > JM Log, see attached > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34050) Rocksdb state has space amplification after rescaling with DeleteRange
[ https://issues.apache.org/jira/browse/FLINK-34050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806599#comment-17806599 ] Jinzhong Li commented on FLINK-34050: - Thanks for your reply. [~mayuehappy] [~masteryhx] IMO, it is unreasonable that redundant data can't be cleaned up for a long time after rescaling. Especially in scenarios where disk space is very tight, this behavior is a major drawback. I agree with that deleteRange+deleteFilesInRanges could be a good default behaviors. As for the performance check about deleteRange+deleteFilesInRanges vs deleteRange, i think the rescaling-state-benchmark should satisfy this [1]. WDYT? [1] https://github.com/apache/flink-benchmarks/blob/master/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java > Rocksdb state has space amplification after rescaling with DeleteRange > -- > > Key: FLINK-34050 > URL: https://issues.apache.org/jira/browse/FLINK-34050 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Jinzhong Li >Priority: Major > Attachments: image-2024-01-10-21-23-48-134.png, > image-2024-01-10-21-24-10-983.png, image-2024-01-10-21-28-24-312.png > > > FLINK-21321 use deleteRange to speed up rocksdb rescaling, however it will > cause space amplification in some case. > We can reproduce this problem using wordCount job: > 1) before rescaling, state operator in wordCount job has 2 parallelism and > 4G+ full checkpoint size; > !image-2024-01-10-21-24-10-983.png|width=266,height=130! > 2) then restart job with 4 parallelism (for state operator), the full > checkpoint size of new job will be 8G+ ; > 3) after many successful checkpoints, the full checkpoint size is still 8G+; > !image-2024-01-10-21-28-24-312.png|width=454,height=111! > > The root cause of this issue is that the deleted keyGroupRange does not > overlap with current DB keyGroupRange, so new data written into rocksdb after > rescaling almost never do LSM compaction with the deleted data (belonging to > other keyGroupRange.) > > And the space amplification may affect Rocksdb read performance and disk > space usage after rescaling. It looks like a regression due to the > introduction of deleteRange for rescaling optimization. > > To slove this problem, I think maybe we can invoke > Rocksdb.deleteFilesInRanges after deleteRange? > {code:java} > public static void clipDBWithKeyGroupRange() { > //... > List ranges = new ArrayList<>(); > //... > deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes); > ranges.add(beginKeyGroupBytes); > ranges.add(endKeyGroupBytes); > // > for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { > db.deleteFilesInRanges(columnFamilyHandle, ranges, false); > } > } > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33768] Support dynamic source parallelism inference for batch jobs [flink]
SinBex closed pull request #24078: [FLINK-33768] Support dynamic source parallelism inference for batch jobs URL: https://github.com/apache/flink/pull/24078 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34035) when flinksql with group by partition field, some errors occured in jobmanager.log
[ https://issues.apache.org/jira/browse/FLINK-34035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hansonhe updated FLINK-34035: - Summary: when flinksql with group by partition field, some errors occured in jobmanager.log (was: when flinksql with group by partition some errors field occured in jobmanager.log) > when flinksql with group by partition field, some errors occured in > jobmanager.log > -- > > Key: FLINK-34035 > URL: https://issues.apache.org/jira/browse/FLINK-34035 > Project: Flink > Issue Type: Bug >Affects Versions: 1.17.1 >Reporter: hansonhe >Priority: Major > > flink.version=1.17.1 > kyuubi.version=1.8.0 > hive.version=3.1.2 > when run some hive sql as followings: > CREATE CATALOG bidwhive WITH ('type' = 'hive', 'hive-version' = '3.1.2', > 'default-database' = 'test'); > (1)select count({_}) from bidwhive.test.dws_test where dt='2024-01-02' ;{_} > _+-+_ > _| EXPR$0 |_ > _+-+_ > _| 1317 |_ > _+-+_ > _It's OK. There is no errors anywhere._ > {_}(2)select dt,count({_}) from bidwhive.test.dws_test where dt='2024-01-02' > group by dt; > {+}--{+}--+ > |dt|EXPR$1| > {+}--{+}--+ > |2024-01-02|1317| > {+}--{+}--+ > It can get correct result. But when i check jobmanager.log,I found some > errors appeared repeatly as folowings.Sometimes the errors also appeared on > the client terminal. I don't known whether these error will affect task > runtime or not?. Can somebody help me to have a see? > ''' > 2024-01-09 14:03:25,979 WARN > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An > exception occurred when fetching query results > java.util.concurrent.ExecutionException: > org.apache.flink.util.FlinkException: Coordinator of operator > e9a3cbdf90f308bdf13b34acd6410e2b does not exist or the job vertex this > operator belongs to is not initialized. at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > ~[?:1.8.0_191] > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > ~[?:1.8.0_191] > at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:170) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:129) > [flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) > [flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) > [flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) > [flink-table-planner_b1e58bff-c004-4dba-b7d4-fff4e8145073.jar:1.17.1] > at > org.apache.flink.table.gateway.service.result.ResultStore$ResultRetrievalThread.run(ResultStore.java:155) > [flink-sql-gateway-1.17.1.jar:1.17.1]Caused by: > org.apache.flink.util.FlinkException: Coordinator of operator > e9a3cbdf90f308bdf13b34acd6410e2b does not exist or the job vertex this > operator belongs to is not initialized. at > org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(DefaultOperatorCoordinatorHandler.java:135) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.deliverCoordinationRequestToCoordinator(SchedulerBase.java:1048) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.runtime.jobmaster.JobMaster.sendRequestToCoordinator(JobMaster.java:602) > ~[flink-dist-1.17.1.jar:1.17.1] > at > org.apache.flink.runtime.jobmaster.JobMaster.deliverCoordinationRequestToCoordinator(JobMaster.java:918) > ~[flink-dist-1.17.1.jar:1.17.1] > at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) ~[?:?] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_191] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) > ~[?:?] > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > ~[?:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) > ~[?:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) > ~[?:?] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) > ~[?:?] > at >
Re: [PR] [FLINK-34077][python] Limits some sphinxcontrib packages upper bounds [flink]
flinkbot commented on PR #24086: URL: https://github.com/apache/flink/pull/24086#issuecomment-1891216727 ## CI report: * 0e96556bd28f20a0e60008df3c9ceabd00598a3d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34077) Sphinx version needs upgrade
[ https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34077: --- Labels: pull-request-available (was: ) > Sphinx version needs upgrade > > > Key: FLINK-34077 > URL: https://issues.apache.org/jira/browse/FLINK-34077 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.19.0 >Reporter: Yunfeng Zhou >Assignee: Xingbo Huang >Priority: Major > Labels: pull-request-available > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] > > {code:java} > Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d > _build/doctrees -a -W . _build/html > Jan 14 15:49:17 Running Sphinx v4.5.0 > Jan 14 15:49:17 > Jan 14 15:49:17 Sphinx version error: > Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project > needs at least Sphinx v5.0; it therefore cannot be built with this version. > > Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed > Jan 14 15:49:17 make: *** [html] Error 2 > Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34077][python] Limits some sphinxcontrib packages upper bounds [flink]
HuangXingBo opened a new pull request, #24086: URL: https://github.com/apache/flink/pull/24086 ## What is the purpose of the change *This pull request will limit some sphinxcontrib packages upper bounds* ## Brief change log - *Limits some sphinxcontrib packages upper bounds* ## Verifying this change This change added tests and can be verified as follows: - *current tests* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34049][table]Refactor classes related to window TVF aggregation to prepare for non-aligned windows [flink]
xuyangzhong commented on PR #24068: URL: https://github.com/apache/flink/pull/24068#issuecomment-1891209495 The CI failure is caused by https://issues.apache.org/jira/browse/FLINK-34077 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33998) Flink Job Manager restarted after kube-apiserver connection intermittent
[ https://issues.apache.org/jira/browse/FLINK-33998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806596#comment-17806596 ] Xiangyan commented on FLINK-33998: -- I can't reproduce it in 1.14.6. May I know where can I find the change history to confirm the fix? Thanks! > Flink Job Manager restarted after kube-apiserver connection intermittent > > > Key: FLINK-33998 > URL: https://issues.apache.org/jira/browse/FLINK-33998 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.13.6 > Environment: Kubernetes 1.24 > Flink Operator 1.4 > Flink 1.13.6 >Reporter: Xiangyan >Priority: Major > Attachments: audit-log-no-restart.txt, audit-log-restart.txt, > connection timeout.png, jm-no-restart4.log, jm-restart4.log > > > We are running Flink on AWS EKS and experienced Job Manager restart issue > when EKS control plane scaled up/in. > I can reproduce this issue in my local environment too. > Since I have no control of EKS kube-apiserver, I built a Kubernetes cluster > by my own with below setup: > * Two kube-apiserver, only one is running at a time; > * Deploy multiple Flink clusters (with Flink Operator 1.4 and Flink 1.13); > * Enable Flink Job Manager HA; > * Configure Job Manager leader election timeout; > {code:java} > high-availability.kubernetes.leader-election.lease-duration: "60s" > high-availability.kubernetes.leader-election.renew-deadline: "60s"{code} > For testing, I switch the running kube-apiserver from one instance to another > each time. When the kube-apiserver is switching, I can see that some Job > Managers restart, but some are still running normally. > Here is an example. When kube-apiserver swatched over at > 05:{color:#ff}{{*53*}}{color}:08, both JM lost connection to > kube-apiserver. But there is no more connection error within a few seconds. I > guess the connection recovered by retry. > However, one of the JM (the 2nd one in the attached screen shot) reported > "DefaultDispatcherRunner was revoked the leadership" error after the leader > election timeout (at 05:{color:#ff}{{*54*}}{color}:08) and then restarted > itself. While the other JM was still running normally. > From kube-apiserver audit logs, the normal JM was able to renew leader lease > after the interruption. But there is no any lease renew request from the > failed JM until it restarted. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34077) Sphinx version needs upgrade
[ https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806595#comment-17806595 ] Xingbo Huang commented on FLINK-34077: -- Some sphinxcontrib(sphinxcontrib-applehelp, sphinxcontrib.devhelp sphinxcontrib.htmlhelp and so on) packages have released new versions, but they have not done compatibility, so the document build fails. I will hofix to limit the versions of these packages. Regarding upgrading the sphix version, some current conf configurations need to be changed, which are incompatible with the current conf. I think it can be done as a new feature. > Sphinx version needs upgrade > > > Key: FLINK-34077 > URL: https://issues.apache.org/jira/browse/FLINK-34077 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.19.0 >Reporter: Yunfeng Zhou >Priority: Major > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] > > {code:java} > Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d > _build/doctrees -a -W . _build/html > Jan 14 15:49:17 Running Sphinx v4.5.0 > Jan 14 15:49:17 > Jan 14 15:49:17 Sphinx version error: > Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project > needs at least Sphinx v5.0; it therefore cannot be built with this version. > > Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed > Jan 14 15:49:17 make: *** [html] Error 2 > Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34077) Sphinx version needs upgrade
[ https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingbo Huang reassigned FLINK-34077: Assignee: Xingbo Huang > Sphinx version needs upgrade > > > Key: FLINK-34077 > URL: https://issues.apache.org/jira/browse/FLINK-34077 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.19.0 >Reporter: Yunfeng Zhou >Assignee: Xingbo Huang >Priority: Major > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] > > {code:java} > Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d > _build/doctrees -a -W . _build/html > Jan 14 15:49:17 Running Sphinx v4.5.0 > Jan 14 15:49:17 > Jan 14 15:49:17 Sphinx version error: > Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project > needs at least Sphinx v5.0; it therefore cannot be built with this version. > > Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed > Jan 14 15:49:17 make: *** [html] Error 2 > Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33057) Add options to disable creating job-id subdirectories under the checkpoint directory
[ https://issues.apache.org/jira/browse/FLINK-33057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu resolved FLINK-33057. -- Fix Version/s: 1.19.0 Resolution: Fixed merged 73b036c3 into master > Add options to disable creating job-id subdirectories under the checkpoint > directory > > > Key: FLINK-33057 > URL: https://issues.apache.org/jira/browse/FLINK-33057 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.19.0 >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > By default, Flink creates subdirectories named by UUID (job id) under > checkpoint directory for each job. It's a good means to avoid collision. > However, it also bring in some effort to remember/find the right directory > when recovering from previous checkpoint. According to previous discussion > ([Yun > Tang's|https://issues.apache.org/jira/browse/FLINK-11789?focusedCommentId=16782314=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16782314] > and [Stephan > Ewen's|https://issues.apache.org/jira/browse/FLINK-9043?focusedCommentId=16409254=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16409254] > ), I think it would be useful to add an option to disable creating the UUID > subdirectories under the checkpoint directory. For compatibility > considerations, we create the subdirectories by default. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806592#comment-17806592 ] Yang Wang commented on FLINK-33728: --- Not only the {{KubernetesResourceManagerDriver}} will create a new watch when received the {{{}TooOldResourceVersion{}}}, but also the fabric8 K8s client has the similar logic in {{{}Reflector.java{}}}[1], which we are using for the Flink Kubernetes HA implementation. In my opinion, the K8s APIServer should have the ability to protect itself by using the flow control[2]. Then it will reject some requests if it could not process too many requests. Flink will then retry to create a new watch when the previous one failed. What Flink could do more is using a {{ExponentialBackoffDelayRetryStrategy}} to replace current continuous retry strategy. [1]. [https://github.com/fabric8io/kubernetes-client/blob/v6.6.2/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java#L288] [2]. [https://kubernetes.io/docs/concepts/cluster-administration/flow-control/] > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33057][Checkpointing] Add options to disable creating job-id subdirectories under the checkpoint directory [flink]
masteryhx closed pull request #23509: [FLINK-33057][Checkpointing] Add options to disable creating job-id subdirectories under the checkpoint directory URL: https://github.com/apache/flink/pull/23509 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33879) Hybrid Shuffle may stop working for a while during redistribution
[ https://issues.apache.org/jira/browse/FLINK-33879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-33879. -- Resolution: Fixed master(1.19) via 879509d7ca886f8f0ed4dd966e859d3c2a5aa231. > Hybrid Shuffle may stop working for a while during redistribution > - > > Key: FLINK-33879 > URL: https://issues.apache.org/jira/browse/FLINK-33879 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Reporter: Jiang Xin >Assignee: Jiang Xin >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently, the Hybrid Shuffle can work with the memory tier and disk tier > together, however, in the following scenario the result partition would stop > working. > Suppose we have a shuffle task with 2 sub-partitions. The LocalBufferPool has > 15 buffers, the memory tier can use at most 15-(2*(2+1)+1) = 8 buffers > according to `TieredStorageMemoryManagerImpl#getMaxNonReclaimableBuffers`. If > the memory tier uses up all 8 buffers and the input channel consumes them > very slowly because of problems, e.g. unstable network, the disk tier can > still work with 1 reserved buffer. However, if a redistribution happens now > and the pool size is decreased to less than 8, then the BufferAccumulator can > not request buffers anymore, and thus the result partition stops working > until the buffers in the memory tier are recycled. > The purpose is to make the result partition still work with the disk tier and > write the shuffle data to disk so that once the input channel is ready, the > data on the disk can be consumed immediately. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33879] Avoids the potential hang of Hybrid Shuffle during redistribution [flink]
reswqa merged PR #23957: URL: https://github.com/apache/flink/pull/23957 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-30613) Improve resolving schema compatibility -- Milestone one
[ https://issues.apache.org/jira/browse/FLINK-30613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu resolved FLINK-30613. -- Resolution: Fixed merged 13921a08...0e5de813 into master > Improve resolving schema compatibility -- Milestone one > --- > > Key: FLINK-30613 > URL: https://issues.apache.org/jira/browse/FLINK-30613 > Project: Flink > Issue Type: Sub-task > Components: API / Type Serialization System >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.19.0 > > > In the milestone one, we should: > # Add an extra method > (TypeserializeSnapshotr#resolveSchemaCompatibility(TypeSerializerSnapshot > oldSerializerSnapshot)) in TypeSerializerSnapshot.java as above, and return > INCOMPATIBLE as default. > # Mark the original method as deprecated and it will use new method to > resolve as default. > # Implement the new method for all built-in TypeserializerSnapshots. > See FLIP-263 for more details. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx closed pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one URL: https://github.com/apache/flink/pull/21635 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail
[ https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806586#comment-17806586 ] Xintong Song commented on FLINK-33728: -- Sorry for the late reply, I was distracted by some other works last week. I think you are right about that JM will kill itself if the re-watch does not succeed. I think it is expected in most cases that the client try re-watch immediately after seeing a ResourceVersionTooOld exception. However, if the first attempt to re-watch fail, JM should not kill itself immediately, but may retry with some backoff interval. cc [~wangyang0918] > do not rewatch when KubernetesResourceManagerDriver watch fail > -- > > Key: FLINK-33728 > URL: https://issues.apache.org/jira/browse/FLINK-33728 > Project: Flink > Issue Type: New Feature > Components: Deployment / Kubernetes >Reporter: xiaogang zhou >Priority: Major > Labels: pull-request-available > > I met massive production problem when kubernetes ETCD slow responding happen. > After Kube recoverd after 1 hour, Thousands of Flink jobs using > kubernetesResourceManagerDriver rewatched when recieving > ResourceVersionTooOld, which caused great pressure on API Server and made > API server failed again... > > I am not sure is it necessary to > getResourceEventHandler().onError(throwable) > in PodCallbackHandlerImpl# handleError method? > > We can just neglect the disconnection of watching process. and try to rewatch > once new requestResource called. And we can leverage on the akka heartbeat > timeout to discover the TM failure, just like YARN mode do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34077) Sphinx version needs upgrade
[ https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806581#comment-17806581 ] xuyang commented on FLINK-34077: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56351=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901 > Sphinx version needs upgrade > > > Key: FLINK-34077 > URL: https://issues.apache.org/jira/browse/FLINK-34077 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.19.0 >Reporter: Yunfeng Zhou >Priority: Major > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] > > {code:java} > Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d > _build/doctrees -a -W . _build/html > Jan 14 15:49:17 Running Sphinx v4.5.0 > Jan 14 15:49:17 > Jan 14 15:49:17 Sphinx version error: > Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project > needs at least Sphinx v5.0; it therefore cannot be built with this version. > > Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed > Jan 14 15:49:17 make: *** [html] Error 2 > Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34077) Sphinx version needs upgrade
[ https://issues.apache.org/jira/browse/FLINK-34077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yunfeng Zhou updated FLINK-34077: - Description: [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] {code:java} Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d _build/doctrees -a -W . _build/html Jan 14 15:49:17 Running Sphinx v4.5.0 Jan 14 15:49:17 Jan 14 15:49:17 Sphinx version error: Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project needs at least Sphinx v5.0; it therefore cannot be built with this version. Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed Jan 14 15:49:17 make: *** [html] Error 2 Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code} was: [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] {code:java} Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d _build/doctrees -a -W . _build/htmlJan 14 15:49:17 Running Sphinx v4.5.0 Jan 14 15:49:17Jan 14 15:49:17 Sphinx version error:Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project needs at least Sphinx v5.0; it therefore cannot be built with this version.Jan 14 15:49:17 Makefile:76: recipe for target 'html' failedJan 14 15:49:17 make: *** [html] Error 2Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code} > Sphinx version needs upgrade > > > Key: FLINK-34077 > URL: https://issues.apache.org/jira/browse/FLINK-34077 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.19.0 >Reporter: Yunfeng Zhou >Priority: Major > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] > > {code:java} > Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d > _build/doctrees -a -W . _build/html > Jan 14 15:49:17 Running Sphinx v4.5.0 > Jan 14 15:49:17 > Jan 14 15:49:17 Sphinx version error: > Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project > needs at least Sphinx v5.0; it therefore cannot be built with this version. > > Jan 14 15:49:17 Makefile:76: recipe for target 'html' failed > Jan 14 15:49:17 make: *** [html] Error 2 > Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34077) Sphinx version needs upgrade
Yunfeng Zhou created FLINK-34077: Summary: Sphinx version needs upgrade Key: FLINK-34077 URL: https://issues.apache.org/jira/browse/FLINK-34077 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.19.0 Reporter: Yunfeng Zhou [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56357=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901] {code:java} Jan 14 15:49:17 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d _build/doctrees -a -W . _build/htmlJan 14 15:49:17 Running Sphinx v4.5.0 Jan 14 15:49:17Jan 14 15:49:17 Sphinx version error:Jan 14 15:49:17 The sphinxcontrib.applehelp extension used by this project needs at least Sphinx v5.0; it therefore cannot be built with this version.Jan 14 15:49:17 Makefile:76: recipe for target 'html' failedJan 14 15:49:17 make: *** [html] Error 2Jan 14 15:49:18 ==sphinx checks... [FAILED]=== {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34063) When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost
[ https://issues.apache.org/jira/browse/FLINK-34063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806520#comment-17806520 ] Yang LI commented on FLINK-34063: - Ok [~dmvk] , I'll do it > When snapshot compression is enabled, rescaling of a source operator leads to > some splits getting lost > -- > > Key: FLINK-34063 > URL: https://issues.apache.org/jira/browse/FLINK-34063 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0, 1.18.1 > Environment: Can be reproduced in any environment. The most important > thing is to enable snapshot compression. >Reporter: Ivan Burmistrov >Assignee: David Morávek >Priority: Blocker > Labels: pull-request-available > Fix For: 1.18.2 > > Attachments: image-2024-01-11-16-27-09-066.png, > image-2024-01-11-16-30-47-466.png > > > h2. Backstory > We've been experimenting with Autoscaling on the Flink 1.18 and faced a > pretty nasty bug. > The symptoms on our production system were as following. After a while after > deploying a job with autoscaler it started accumulating Kafka lag, and this > could only be observed via external lag measurement - from inside Flink > (measured by > {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK: > !image-2024-01-11-16-27-09-066.png|width=887,height=263! > After some digging, it turned out that the job has lost some Kafka partitions > - i.e. it stopped consuming from them, “forgot” about their existence. That’s > why from the Flink’s perspective everything was fine - the lag was growing on > the partitions Flink no longer knew about. > This was visible on a metric called “Assigned partitions” > (KafkaSourceReader_KafkaConsumer_assigned_partitions): > !image-2024-01-11-16-30-47-466.png|width=1046,height=254! > We see on the chart that the job used to know about 20 partitions, and then > this number got dropped to 16. > This drop has been quickly connected to the job’s scaling events. Or, more > precisely, to the scaling of the source operator - with almost 100% > probability any scaling of the source operator led to partitions loss. > h2. Investigation > We've conducted the investigation. We use the latest Kubernetes operator and > deploy jobs with Native Kubernetes. > The reproducing scenario we used for investigation: > * Launch a job with source operator parallelism = 4, enable DEBUG logging > * Wait until it takes the first checkpoint > * Scale-up the source operator to say 5 (no need to wait for autoscaling, it > can be done via Flink UI) > * Wait until the new checkpoint is taken > * Scale-down the source operator to 3 > These simple actions with almost 100% probability led to some partitions get > lost. > After that we've downloaded all the logs and inspected them. Noticed these > strange records in logs: > {code:java} > {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring > state for 4 split(s) to reader.","service_name":"data-beaver"} > {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding > split(s) to reader: > [ > [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, > StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code} > We see that some task being restored with 4 splits, however actual splits > have duplicates - we see that in reality 2 unique partitions have been added > ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}). > Digging into the code and the logs a bit more, log lines like this started > looking suspicious: > > {code:java} > {"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG", > "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state > SubtaskState{operatorStateFromBackend=StateObjectCollection{ > [OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244, > 244], distributionMode=SPLIT_DISTRIBUTE}}, >
Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]
zhuzhurk commented on code in PR #24025: URL: https://github.com/apache/flink/pull/24025#discussion_r1451774802 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ## @@ -140,7 +140,7 @@ public StreamGraph( ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, SavepointRestoreSettings savepointRestoreSettings) { -this.jobConfiguration = checkNotNull(jobConfiguration); +this.jobConfiguration = new Configuration(jobConfiguration); Review Comment: Maybe `new Configuration(checkNotNull(jobConfiguration));`? ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java: ## @@ -446,16 +446,20 @@ void testBufferTimeoutEnabled() { void testBufferTimeoutDisabled() { Configuration config = new Configuration(); config.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, false); -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); Review Comment: Not sure why passing the config in here? It will be used in `env.configure()` right after. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]
zhuzhurk commented on code in PR #24025: URL: https://github.com/apache/flink/pull/24025#discussion_r1451772433 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java: ## @@ -82,7 +83,8 @@ public void setUp() { @Test public void testImmediateCheckpointing() throws Exception { env.setRestartStrategy(RestartStrategies.noRestart()); -env.enableCheckpointing(Long.MAX_VALUE - 1); +env.enableCheckpointing( +Duration.ofNanos(Long.MAX_VALUE /* max allowed by FLINK */).toMillis()); Review Comment: I see. What confused me is that it changes `Long.MAX_VALUE - 1` to `Long.MAX_VALUE`. I think these two numbers do not make actual differences though. Besides that, I can see other usages of `enableCheckpointing(...)` which passes in a large number, e.g. `CheckpointAfterAllTasksFinishedITCase#testRestoreAfterSomeTasksFinished()` and `RestoreUpgradedJobITCase#runOriginalJob()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]
jeyhunkarimov commented on PR #23470: URL: https://github.com/apache/flink/pull/23470#issuecomment-189017 Hi @JingGe @xuyangzhong thanks for your reviews. I updated the PR addressing your comments. I think the CI failure is not related to this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]
jeyhunkarimov commented on code in PR #23470: URL: https://github.com/apache/flink/pull/23470#discussion_r1451764950 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala: ## @@ -50,14 +51,23 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) val tableConfig = planner.getTableConfig // build RelNodeBlock plan val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, tableConfig) +val miniBatchRequirementChecker = { + (node: RelNode) => +node.isInstanceOf[Filter] || Review Comment: Hi @xuyangzhong thanks for your comment. Good point. There are two main reasons I think we should not move this logic elsewhere than `StreamCommonSubGraphBasedOptimizer`: - There is a small difference in the logic when we move the logic to the `MiniBatchIntervalInferRule`. In `MiniBatchIntervalInferRule` the check for the `miniBatchEnabled` is calculated via global variables: ``` val miniBatchEnabled = tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED) ``` Therefore, it is deterministic. If we calculate the mini-batch skipping logic in `MiniBatchIntervalInferRule`, the result will not be deterministic, because the optimizer will invoke `MiniBatchIntervalInferRule::onMatch` with different parts of the query plan (not necessarily only with the root of the query plan). - Also, there are many parts of the code that decide whether mini batch enabled via checking the global configuration variable. Therefore, it might be better to unset the global configuration inside `StreamCommonSubGraphBasedOptimizer` and set the original configuration variable at the end. I used `try...finally` for that. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]
jeyhunkarimov commented on code in PR #23470: URL: https://github.com/apache/flink/pull/23470#discussion_r1451763272 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala: ## @@ -50,14 +51,23 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) val tableConfig = planner.getTableConfig // build RelNodeBlock plan val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, tableConfig) +val miniBatchRequirementChecker = { + (node: RelNode) => +node.isInstanceOf[Filter] || Review Comment: Agreed, done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]
jeyhunkarimov commented on code in PR #23470: URL: https://github.com/apache/flink/pull/23470#discussion_r1451763251 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala: ## @@ -50,14 +51,23 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) val tableConfig = planner.getTableConfig // build RelNodeBlock plan val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, tableConfig) +val miniBatchRequirementChecker = { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]
jeyhunkarimov commented on code in PR #23470: URL: https://github.com/apache/flink/pull/23470#discussion_r1451763129 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala: ## @@ -50,14 +51,23 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) val tableConfig = planner.getTableConfig // build RelNodeBlock plan val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, tableConfig) +val miniBatchRequirementChecker = { + (node: RelNode) => +node.isInstanceOf[Filter] || +node.isInstanceOf[Project] || +node.isInstanceOf[TableScan] || +(node.isInstanceOf[LogicalUnion] && node.asInstanceOf[LogicalUnion].all) +} // infer trait properties for sink block sinkBlocks.foreach { sinkBlock => // don't require update before by default sinkBlock.setUpdateBeforeRequired(false) - val miniBatchInterval: MiniBatchInterval = - if (tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)) { + if ( + tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED) && +!sinkBlock.containsAll(miniBatchRequirementChecker) Review Comment: Agreed. Also, I moved the mini-batch skipping logic from `RelnodeBlock` to `StreamCommonSubGraphBasedOptimizer:: shouldSkipMiniBatch`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]
jeyhunkarimov commented on code in PR #23470: URL: https://github.com/apache/flink/pull/23470#discussion_r1451762837 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/MiniBatchOptimizationTest.java: ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.analyze; + +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.planner.delegation.StreamPlanner; +import org.apache.flink.table.planner.plan.optimize.RelNodeBlock; +import org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer; +import org.apache.flink.table.planner.plan.trait.MiniBatchIntervalTrait; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.table.planner.utils.TableTestUtil; + +import org.apache.calcite.rel.RelNode; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import static org.junit.Assert.assertTrue; + +/** + * Test for enabling/disabling mini-batch assigner operator based on query plan. The optimization is + * performed in {@link StreamCommonSubGraphBasedOptimizer}. + */ +@RunWith(Parameterized.class) +public class MiniBatchOptimizationTest extends TableTestBase { + +private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault()); +private final StreamTableEnvironment streamTableEnv = +StreamTableEnvironment.create(util.getStreamEnv()); + +@Parameterized.Parameter public boolean isMiniBatchEnabled; + +@Parameterized.Parameter(1) +public long miniBatchLatency; + +@Parameterized.Parameter(2) +public long miniBatchSize; + +@Before +public void before() { +streamTableEnv +.getConfig() +.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, isMiniBatchEnabled) +.set( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, +Duration.ofSeconds(miniBatchLatency)) +.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, miniBatchSize); +streamTableEnv.executeSql( +"CREATE TABLE MyTableA (\n" ++ " a BIGINT,\n" ++ " b INT NOT NULL,\n" ++ " c VARCHAR,\n" ++ " d BIGINT\n" ++ ") WITH (\n" ++ " 'connector' = 'values',\n" ++ " 'bounded' = 'false')"); +streamTableEnv.executeSql( +"CREATE TABLE MyTableB (\n" ++ " a BIGINT,\n" ++ " b INT NOT NULL,\n" ++ " c VARCHAR,\n" ++ " d BIGINT\n" ++ ") WITH (\n" ++ " 'connector' = 'values',\n" ++ " 'bounded' = 'false')"); +} + +private boolean containsMiniBatch(String sql) { +final Table result = streamTableEnv.sqlQuery(sql); +RelNode relNode = TableTestUtil.toRelNode(result); +StreamPlanner planner = +(StreamPlanner) ((TableEnvironmentImpl) streamTableEnv).getPlanner(); +StreamCommonSubGraphBasedOptimizer optimizer = +new StreamCommonSubGraphBasedOptimizer(planner); +Seq nodeSeq = + JavaConverters.asScalaIteratorConverter(Arrays.asList(relNode).iterator()) +.asScala() +.toSeq(); +Seq blockSeq = optimizer.doOptimize(nodeSeq); +List blockList =
[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create
[ https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Khanh Vu updated FLINK-34076: - Description: The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in runtime. E.g. with following depenency only in pom.xml {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} {code} and a minimal job definition: {code:java} public static void main(String[] args) throws Exception { // create data stream environment StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv); Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build(); TableDescriptor descriptor = TableDescriptor.forConnector("kinesis") .schema(a) .format("json") .build(); tEnv.createTemporaryTable("sinkTable", descriptor); tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString()).print(); } {code} following exception will be thrown: {code:java} Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?] at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] ... 28 more {code} The fix is to explicitly specify `flink-connector-base` as dependency of the project: {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} org.apache.flink flink-connector-base ${flink.version} provided {code} In general, `flink-connector-base` should be pulled in by default when pulling in the kinesis connector, the current separation adds unnecessary hassle to use the connector. was: The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in runtime. E.g. with following depenency only in pom.xml {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} {code} and a minimal job definition: {code:java} public static void main(String[] args) throws Exception { // create data stream environment StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv); Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build(); TableDescriptor descriptor = TableDescriptor.forConnector("kinesis") .schema(a) .format("json") .option("stream", "abc") .option("aws.region", "eu-central-1") .build(); tEnv.createTemporaryTable("sinkTable", descriptor); tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString()).print(); } {code} following exception will be thrown: {code:java} Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?] at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] ... 28 more {code} The fix is to explicitly
[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create
[ https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Khanh Vu updated FLINK-34076: - Description: The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in runtime. E.g. with following depenency only in pom.xml {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} {code} and a minimal job definition: {code:java} public static void main(String[] args) throws Exception { // create data stream environment StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv); Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build(); TableDescriptor descriptor = TableDescriptor.forConnector("kinesis") .schema(a) .format("json") .option("stream", "abc") .option("aws.region", "eu-central-1") .build(); tEnv.createTemporaryTable("sinkTable", descriptor); tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString()).print(); } {code} following exception will be thrown: {code:java} Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?] at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] ... 28 more {code} The fix is to explicitly specify `flink-connector-base` as dependency of the project: {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} org.apache.flink flink-connector-base ${flink.version} provided {code} In general, `flink-connector-base` should be pulled in by default when pulling in the kinesis connector, the current separation adds unnecessary hassle to use the connector. was: The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in runtime. E.g. with following depenency only in pom.xml {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} {code} following exception will be thrown: {code:java} Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' can only be used as a source. It cannot be used as a sink. at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756) at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265) {code} The fix is to explicitly specify `flink-connector-base` as dependency of the project: {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} org.apache.flink flink-connector-base ${flink.version} provided {code} In general, `flink-connector-base` should be pulled in by default when pulling in the connector, the current separation adds unnecessary hassle to use the connector. > flink-connector-base missing fails kinesis table sink to create > --- > > Key: FLINK-34076 > URL: https://issues.apache.org/jira/browse/FLINK-34076 > Project: Flink > Issue Type: Bug > Components: Connectors /
Re: [PR] [FLINK-33741]Expose Rocksdb Histogram statistics in Flink metrics [flink]
flinkbot commented on PR #24050: URL: https://github.com/apache/flink/pull/24050#issuecomment-1890987057 ## CI report: * 64ca2d0a5711ffa6a5121db073fbeceaba9178b6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create
[ https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Khanh Vu updated FLINK-34076: - Description: The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in runtime. E.g. with following depenency only in pom.xml {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} {code} following exception will be thrown: {code:java} Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' can only be used as a source. It cannot be used as a sink. at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756) at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265) {code} The fix is to explicitly specify `flink-connector-base` as dependency of the project: {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} org.apache.flink flink-connector-base ${flink.version} provided {code} In general, `flink-connector-base` should be pulled in by default when pulling in the connector, the current separation adds unnecessary hassle to use the connector. was: The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in execution. E.g. with following depenency only in pom.xml {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} {code} following exception will be thrown: {code:java} Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' can only be used as a source. It cannot be used as a sink. at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756) at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265) {code} The fix is to explicitly specify `flink-connector-base` as dependency of the project: {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} org.apache.flink flink-connector-base ${flink.version} provided {code} In general, `flink-connector-base` should be pulled in by default when pulling in the connector, the current separation adds unnecessary hassle to use the connector. > flink-connector-base missing fails kinesis table sink to create > --- > > Key: FLINK-34076 > URL: https://issues.apache.org/jira/browse/FLINK-34076 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: aws-connector-4.2.0 >Reporter: Khanh Vu >Priority: Major > > The > [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] > which stops bundling `flink-connector-base` with `flink-connector-kinesis` > has caused kinesis sink failing to create when using Table API as required > classes from `flink-connector-base` are not loaded in runtime. > E.g. with following depenency only in pom.xml > {code:java} > > org.apache.flink > flink-connector-kinesis > ${flink.connector.kinesis.version} > > {code} > following exception will be thrown: > {code:java} > Caused by: org.apache.flink.table.api.ValidationException: Connector > 'kinesis' can only be used as a source. It cannot be used as a sink. > at > org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756) > at > org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710) > at >
[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create
[ https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Khanh Vu updated FLINK-34076: - Description: The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in execution. E.g. with following depenency only in pom.xml {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} {code} following exception will be thrown: {code:java} Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' can only be used as a source. It cannot be used as a sink. at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756) at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265) {code} The fix is to explicitly specify `flink-connector-base` as dependency of the project: {code:java} org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} org.apache.flink flink-connector-base ${flink.version} provided {code} In general, `flink-connector-base` should be pulled in by default when pulling in the connector, the current separation adds unnecessary hassle to use the connector. was: The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in execution. {{E.g. with following depenency only in pom.xml}} {{```}} Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' can only be used as a source. It cannot be used as a sink.}} {{at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)}} {{at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)}} {{at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)}} {{... 20 more {{```}} following exception will be thrown: ``` {{Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' can only be used as a source. It cannot be used as a sink. at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756) at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265) ... 20 more}} ``` Workaround is to explicitly specify `flink-connector-base` as dependency of the project: ``` {{ org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} org.apache.flink flink-connector-base ${flink.version} provided }} ``` In general, `flink-connector-base` should be pulled in by default when pulling in the connector, the current separation adds unnecessary hassle to use the connector. > flink-connector-base missing fails kinesis table sink to create > --- > > Key: FLINK-34076 > URL: https://issues.apache.org/jira/browse/FLINK-34076 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: aws-connector-4.2.0 >Reporter: Khanh Vu >Priority: Major > > The > [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] > which stops bundling `flink-connector-base` with `flink-connector-kinesis` > has caused kinesis sink failing to create when using Table API as required > classes from `flink-connector-base` are not loaded in execution. > E.g. with following depenency only in pom.xml > {code:java} > > org.apache.flink > flink-connector-kinesis > ${flink.connector.kinesis.version} > > {code} > following exception will be thrown: > {code:java} > Caused by: org.apache.flink.table.api.ValidationException: Connector > 'kinesis' can only be used as a source. It cannot be used as a sink. > at > org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756) > at >
[jira] [Updated] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create
[ https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Khanh Vu updated FLINK-34076: - Description: The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in execution. {{E.g. with following depenency only in pom.xml}} {{```}} Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' can only be used as a source. It cannot be used as a sink.}} {{at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)}} {{at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)}} {{at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)}} {{... 20 more {{```}} following exception will be thrown: ``` {{Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' can only be used as a source. It cannot be used as a sink. at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756) at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265) ... 20 more}} ``` Workaround is to explicitly specify `flink-connector-base` as dependency of the project: ``` {{ org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} org.apache.flink flink-connector-base ${flink.version} provided }} ``` In general, `flink-connector-base` should be pulled in by default when pulling in the connector, the current separation adds unnecessary hassle to use the connector. was: The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in execution. E.g. with following depenency only in pom.xml ``` {{Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' can only be used as a source. It cannot be used as a sink. at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756) at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265) ... 20 more}} ``` following exception will be thrown: ``` {{Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' can only be used as a source. It cannot be used as a sink. at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756) at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265) ... 20 more}} ``` Workaround is to explicitly specify `flink-connector-base` as dependency of the project: ``` {{ org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} org.apache.flink flink-connector-base ${flink.version} provided }} ``` In general, `flink-connector-base` should be pulled in by default when pulling in the connector, the current separation adds unnecessary hassle to use the connector. > flink-connector-base missing fails kinesis table sink to create > --- > > Key: FLINK-34076 > URL: https://issues.apache.org/jira/browse/FLINK-34076 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: aws-connector-4.2.0 >Reporter: Khanh Vu >Priority: Major > > The > [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] > which stops bundling `flink-connector-base` with `flink-connector-kinesis` > has caused kinesis sink failing to create when using Table API as required > classes from `flink-connector-base` are not loaded in execution. > {{E.g. with following depenency only in pom.xml}} > {{```}} > Caused by: org.apache.flink.table.api.ValidationException: Connector > 'kinesis' can only be used as a source. It cannot be used as a sink.}} > {{at >
[jira] [Created] (FLINK-34076) flink-connector-base missing fails kinesis table sink to create
Khanh Vu created FLINK-34076: Summary: flink-connector-base missing fails kinesis table sink to create Key: FLINK-34076 URL: https://issues.apache.org/jira/browse/FLINK-34076 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Affects Versions: aws-connector-4.2.0 Reporter: Khanh Vu The [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] which stops bundling `flink-connector-base` with `flink-connector-kinesis` has caused kinesis sink failing to create when using Table API as required classes from `flink-connector-base` are not loaded in execution. E.g. with following depenency only in pom.xml ``` {{Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' can only be used as a source. It cannot be used as a sink. at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756) at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265) ... 20 more}} ``` following exception will be thrown: ``` {{Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' can only be used as a source. It cannot be used as a sink. at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756) at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265) ... 20 more}} ``` Workaround is to explicitly specify `flink-connector-base` as dependency of the project: ``` {{ org.apache.flink flink-connector-kinesis ${flink.connector.kinesis.version} org.apache.flink flink-connector-base ${flink.version} provided }} ``` In general, `flink-connector-base` should be pulled in by default when pulling in the connector, the current separation adds unnecessary hassle to use the connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33741]Expose Rocksdb Histogram statistics in Flink metrics [flink]
Myasuka commented on code in PR #24050: URL: https://github.com/apache/flink/pull/24050#discussion_r1451751194 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java: ## @@ -302,6 +303,20 @@ public class RocksDBNativeMetricOptions implements Serializable { .withDescription( "Monitor the duration of writer requiring to wait for compaction or flush to finish in RocksDB."); +public static final ConfigOption MONITOR_DB_GET = +ConfigOptions.key("state.backend.rocksdb.metrics.db_get") +.booleanType() +.defaultValue(false) +.withDescription( +"Monitor the metric that measures the time taken for a RocksDB database to perform a read operation in microseconds"); + +public static final ConfigOption MONITOR_DB_WRITE = +ConfigOptions.key("state.backend.rocksdb.metrics.db_write") Review Comment: Same here, I think `state.backend.rocksdb.metrics.db-write` looks better. ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java: ## @@ -231,4 +252,88 @@ public void update() { setStatistics(this); } } + +/** + * A gauge which periodically pulls a RocksDB statistics-based native statistic metric for the database. Review Comment: It's not a gauge here. ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java: ## @@ -231,4 +252,88 @@ public void update() { setStatistics(this); } } + +/** + * A gauge which periodically pulls a RocksDB statistics-based native statistic metric for the database. + */ +class RocksDBNativeHistogramStatisticsMetricView extends RocksDBNativeView implements Histogram { +private final HistogramType histogramType; + +private HistogramData histogramData; + +private RocksDBNativeHistogramStatisticsMetricView(HistogramType histogramType) { +this.histogramType = histogramType; +} + +void setValue(HistogramData histogramData) { +this.histogramData = histogramData; +} + +@Override +public void update() { +setHistogramStatistics(this); +} + +@Override +public void update(long value) { +} + +@Override +public long getCount() { +return histogramData == null ? 0 : histogramData.getCount(); +} + +@Override +public HistogramStatistics getStatistics() { +return new RocksDBNativeHistogram(); +} + +class RocksDBNativeHistogram extends HistogramStatistics { + +@Override +public double getQuantile(double quantile) { +if (histogramData == null) { +return 0; +} else if (quantile == 0.5) { +return histogramData.getMedian(); +} else if (quantile == 0.95) { +return histogramData.getPercentile95(); +} else if (quantile == 0.99) { +return histogramData.getPercentile99(); +} else { +return 0; +} +} + +@Override +public long[] getValues() { +return new long[0]; +} + +@Override +public int size() { +return 0; Review Comment: Why not use `histogramData.getCount()`? ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java: ## @@ -302,6 +303,20 @@ public class RocksDBNativeMetricOptions implements Serializable { .withDescription( "Monitor the duration of writer requiring to wait for compaction or flush to finish in RocksDB."); +public static final ConfigOption MONITOR_DB_GET = +ConfigOptions.key("state.backend.rocksdb.metrics.db_get") Review Comment: Please follow the metrics options style in this class, use `state.backend.rocksdb.metrics.db-get`. ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java: ## @@ -231,4 +252,88 @@ public void update() { setStatistics(this); } } + +/** + * A gauge which periodically pulls a RocksDB statistics-based native statistic metric for the database. + */ +class RocksDBNativeHistogramStatisticsMetricView extends RocksDBNativeView implements Histogram { +private final HistogramType
Re: [PR] [FLINK-34072][scripts] Replace java command to JAVA_RUN in config.sh [flink]
flinkbot commented on PR #24085: URL: https://github.com/apache/flink/pull/24085#issuecomment-1890975682 ## CI report: * 6b6151ac82aa466850d93eefd7951e1fd65ca01b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34072) Use JAVA_RUN in shell scripts
[ https://issues.apache.org/jira/browse/FLINK-34072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34072: --- Labels: pull-request-available (was: ) > Use JAVA_RUN in shell scripts > - > > Key: FLINK-34072 > URL: https://issues.apache.org/jira/browse/FLINK-34072 > Project: Flink > Issue Type: Improvement > Components: Deployment / Scripts >Reporter: Yun Tang >Assignee: Yu Chen >Priority: Minor > Labels: pull-request-available > Fix For: 1.19.0 > > > We should call {{JAVA_RUN}} in all cases when we launch {{java}} command, > otherwise we might be able to run the {{java}} if JAVA_HOME is not set. > such as: > {code:java} > flink-1.19-SNAPSHOT-bin/flink-1.19-SNAPSHOT/bin/config.sh: line 339: > 17 : > syntax error: operand expected (error token is "> 17 ") > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34072][scripts] Replace java command to JAVA_RUN in config.sh [flink]
yuchen-ecnu opened a new pull request, #24085: URL: https://github.com/apache/flink/pull/24085 ## What is the purpose of the change Replace the `java` command with `JAVA_RUN` in scripts. ## Brief change log - Replace `java` with `JAVA_RUN` in config.sh ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33057][Checkpointing] Add options to disable creating job-id subdirectories under the checkpoint directory [flink]
Zakelly commented on PR #23509: URL: https://github.com/apache/flink/pull/23509#issuecomment-1890974314 > Thanks for the update. Current PR LGTM % the conflict. Could you rebase all commits into master to resolve the conflict ? BTW, please remember to rename the title with component tag. Already done, thanks @masteryhx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33057] Add options to disable creating job-id subdirectories under the checkpoint directory [flink]
Myasuka commented on code in PR #23509: URL: https://github.com/apache/flink/pull/23509#discussion_r1451744345 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java: ## @@ -72,6 +73,7 @@ public MemoryBackendCheckpointStorageAccess( JobID jobId, @Nullable Path checkpointsBaseDirectory, @Nullable Path defaultSavepointLocation, +boolean createCheckpointSubDirs, Review Comment: I have no strong against here. We can keep the current design. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34072) Use JAVA_RUN in shell scripts
[ https://issues.apache.org/jira/browse/FLINK-34072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806474#comment-17806474 ] Yun Tang commented on FLINK-34072: -- [~Yu Chen] Already assigned, please go ahead. > Use JAVA_RUN in shell scripts > - > > Key: FLINK-34072 > URL: https://issues.apache.org/jira/browse/FLINK-34072 > Project: Flink > Issue Type: Improvement > Components: Deployment / Scripts >Reporter: Yun Tang >Assignee: Yu Chen >Priority: Minor > Fix For: 1.19.0 > > > We should call {{JAVA_RUN}} in all cases when we launch {{java}} command, > otherwise we might be able to run the {{java}} if JAVA_HOME is not set. > such as: > {code:java} > flink-1.19-SNAPSHOT-bin/flink-1.19-SNAPSHOT/bin/config.sh: line 339: > 17 : > syntax error: operand expected (error token is "> 17 ") > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34072) Use JAVA_RUN in shell scripts
[ https://issues.apache.org/jira/browse/FLINK-34072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-34072: Assignee: Yu Chen > Use JAVA_RUN in shell scripts > - > > Key: FLINK-34072 > URL: https://issues.apache.org/jira/browse/FLINK-34072 > Project: Flink > Issue Type: Improvement > Components: Deployment / Scripts >Reporter: Yun Tang >Assignee: Yu Chen >Priority: Minor > Fix For: 1.19.0 > > > We should call {{JAVA_RUN}} in all cases when we launch {{java}} command, > otherwise we might be able to run the {{java}} if JAVA_HOME is not set. > such as: > {code:java} > flink-1.19-SNAPSHOT-bin/flink-1.19-SNAPSHOT/bin/config.sh: line 339: > 17 : > syntax error: operand expected (error token is "> 17 ") > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33839) Deploy Python artifacts to PyPI (need PMC role)
[ https://issues.apache.org/jira/browse/FLINK-33839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge reassigned FLINK-33839: --- Assignee: (was: Jing Ge) > Deploy Python artifacts to PyPI (need PMC role) > --- > > Key: FLINK-33839 > URL: https://issues.apache.org/jira/browse/FLINK-33839 > Project: Flink > Issue Type: Sub-task >Reporter: Jing Ge >Priority: Major > > Release manager should create a PyPI account and ask the PMC add this account > to pyflink collaborator list with Maintainer role (The PyPI admin account > info can be found here. NOTE, only visible to PMC members) to deploy the > Python artifacts to PyPI. The artifacts could be uploaded using > twine([https://pypi.org/project/twine/]). To install twine, just run: > {code:java} > pip install --upgrade twine==1.12.0 > {code} > Download the python artifacts from dist.apache.org and upload it to pypi.org: > {code:java} > svn checkout > https://dist.apache.org/repos/dist/dev/flink/flink-${RELEASE_VERSION}-rc${RC_NUM} > cd flink-${RELEASE_VERSION}-rc${RC_NUM} > > cd python > > #uploads wheels > for f in *.whl; do twine upload --repository-url > https://upload.pypi.org/legacy/ $f $f.asc; done > > #upload source packages > twine upload --repository-url https://upload.pypi.org/legacy/ > apache-flink-libraries-${RELEASE_VERSION}.tar.gz > apache-flink-libraries-${RELEASE_VERSION}.tar.gz.asc > > twine upload --repository-url https://upload.pypi.org/legacy/ > apache-flink-${RELEASE_VERSION}.tar.gz > apache-flink-${RELEASE_VERSION}.tar.gz.asc > {code} > If upload failed or incorrect for some reason (e.g. network transmission > problem), you need to delete the uploaded release package of the same version > (if exists) and rename the artifact to > {{{}apache-flink-${RELEASE_VERSION}.post0.tar.gz{}}}, then re-upload. > (!) Note: re-uploading to pypi.org must be avoided as much as possible > because it will cause some irreparable problems. If that happens, users > cannot install the apache-flink package by explicitly specifying the package > version, i.e. the following command "pip install > apache-flink==${RELEASE_VERSION}" will fail. Instead they have to run "pip > install apache-flink" or "pip install apache-flink==${RELEASE_VERSION}.post0" > to install the apache-flink package. > > > h3. Expectations > * Python artifacts released and indexed in the > [PyPI|https://pypi.org/project/apache-flink/] Repository -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33840) Deploy artifacts to Maven Central Repository (need PMC role)
[ https://issues.apache.org/jira/browse/FLINK-33840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Ge reassigned FLINK-33840: --- Assignee: (was: Jing Ge) > Deploy artifacts to Maven Central Repository (need PMC role) > > > Key: FLINK-33840 > URL: https://issues.apache.org/jira/browse/FLINK-33840 > Project: Flink > Issue Type: Sub-task >Reporter: Jing Ge >Priority: Major > > Use the [Apache Nexus repository|https://repository.apache.org/] to release > the staged binary artifacts to the Maven Central repository. In the Staging > Repositories section, find the relevant release candidate orgapacheflink-XXX > entry and click Release. Drop all other release candidates that are not being > released. > h3. Deploy source and binary releases to dist.apache.org > Copy the source and binary releases from the dev repository to the release > repository at [dist.apache.org|http://dist.apache.org/] using Subversion. > {code:java} > $ svn move -m "Release Flink ${RELEASE_VERSION}" > https://dist.apache.org/repos/dist/dev/flink/flink-${RELEASE_VERSION}-rc${RC_NUM} > https://dist.apache.org/repos/dist/release/flink/flink-${RELEASE_VERSION} > {code} > (Note: Only PMC members have access to the release repository. If you do not > have access, ask on the mailing list for assistance.) > -- This message was sent by Atlassian Jira (v8.20.10#820010)