[GitHub] bowenli86 commented on issue #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
bowenli86 commented on issue #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/7643#issuecomment-462653964 Just took another look and found this PR needs to adapt to the new flink-table structure given that flink-table has been moved out of flink-libraries and broken into several new modules. Specifically, all the new classes need to be in flink/flink-table/flink-table-planner, where the existing ExternalCatalog and related code reside This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bowenli86 removed a comment on issue #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
bowenli86 removed a comment on issue #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/7643#issuecomment-461575592 LGTM +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r255822550 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/RateLimiterFactory.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.config; + +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter; + +import java.util.Properties; + +/** + * A RateLimiterFactory that configures and creates a rate limiter. + */ +public class RateLimiterFactory { + + /** Flag that indicates if ratelimiting is enabled. */ + private static final String RATELIMITING_FLAG = "consumer.ratelimiting.enabled"; Review comment: Personally, I prefer independent configurations if possible. I am wondering if the following configuration names would be clear enough: ``` CONSUMER_MAX_BYTES_PER_SECOND_CONFIG="consumer.max.bytes.per.second" CONSUMER_MAX_RECORDS_PER_SECOND_CONFIG="consumer.max.records.per.second" ``` In the future we can add configurations for the producers. The default value of the above configurations could be either be set to Long.MAX_VALUE, or -1 to indicate not throttling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r255776176 ## File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ## @@ -69,6 +70,10 @@ private static final long serialVersionUID = 2324564345203409112L; + /** Configuration to set consumer prefix for ratelimiting. **/ + private static final String CONSUMER_PREFIX = "kafka"; Review comment: Do we need this prefix? Can all the source and sink share the same configuration name? If we do need the prefix, can we put this in somewhere like `FlinkKafkaConsumerBase`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r255816157 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/RateLimiterFactory.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.config; + +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter; + +import java.util.Properties; + +/** + * A RateLimiterFactory that configures and creates a rate limiter. + */ +public class RateLimiterFactory { Review comment: Can we move this class to a separate package so it can be shared by all the other connectors? For example, a `flink-connector-throttling` module in `flink-connectors`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r255815625 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ## @@ -482,6 +502,49 @@ void reassignPartitions(List> newPartit return new KafkaConsumer<>(kafkaProperties); } + @VisibleForTesting + RateLimiter getRateLimiter() { + return rateLimiter; + } + + // --- + // Rate limiting methods + // --- + /** +* +* @param records List of ConsumerRecords. +* @return Total batch size in bytes, including key and value. +*/ + private int getRecordBatchSize(ConsumerRecords records) { Review comment: It is a little unfortunate that we have to make an additional iteration over the records to get the sizes. Another option is putting the throttling logic in the `AbstractFetcher`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r255813728 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ## @@ -22,12 +22,16 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.connectors.kafka.config.RateLimiterFactory; import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue; import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper; +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter; Review comment: Do we want to directly import class from the shaded jar? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on issue #7549: [FLINK-11403][network] Remove ResultPartitionConsumableNotifier from ResultPartition
zhijiangW commented on issue #7549: [FLINK-11403][network] Remove ResultPartitionConsumableNotifier from ResultPartition URL: https://github.com/apache/flink/pull/7549#issuecomment-462638845 @azagrebin , thanks for your reviews! I think your suggestion makes sense and is worth trying. We wrap the `ResultPartition` created from `ShuffleService` and other fields related to consumption notification in a separate new class. Then the `RecordWriter` and `Task` only see this new wrapped class avoid of spreading tiny ones anywhere. I would have a try and re-submit the codes when ready. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on issue #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
zhijiangW commented on issue #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#issuecomment-462637123 @azagrebin , thanks for your reviews! :) I was also confused a bit by the points you mentioned during implementation. I left some thoughts if I understood your suggestions correctly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#discussion_r255819481 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ShuffleDeploymentDescriptor.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.runtime.io.network.ConnectionID; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Deployment descriptor for shuffle specific information. + */ +public class ShuffleDeploymentDescriptor implements Serializable { Review comment: It might need an explicit method of `getConnectionId` in the interface if to do? Because the ICDD might either see `UnknownShuffleDeploymentDescriptor` or `KnownShuffleDeploymentDescriptor` and it should provide the way of getting `ConnectionID` if `LocationType==Remote`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#discussion_r255818212 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ## @@ -52,30 +52,38 @@ /** The ID of the partition the input channel is going to consume. */ private final ResultPartitionID consumedPartitionId; - /** The location of the partition the input channel is going to consume. */ - private final ResultPartitionLocation consumedPartitionLocation; + /** The location type of the partition the input channel is going to consume. */ + private final LocationType locationType; + + /** The connection to use to request the remote partition. */ + private final Optional connectionId; Review comment: I also considered using `ShuffleDeploymentDescriptor` here to replace `ConnectionID` before. But there are two concerns in implementation: 1. In eager schedule mode when receiving all required slots, we might not assume the deployment sequence must be strict with topology sequence. That means the consumer execution deployment might be earlier than the producer execution. So in the process of `InputChannelDeploymentDescriptor#fromEdges`, we might not get cached SDD directly from producer execution. But we can generate `ConnectionID` based on other infos. Otherwise we must confirm the deployment sequence is from producer to consumer or generate producer's SDD during deploying consumer in `InputChannelDeploymentDescriptor#fromEdges`. 2. I thought of introducing `UnknownShuffleDeploymentDescriptor` before, but from semantic aspect it is a bit redundant with `LocationType.Unknown`. In addition, it seems no specific usages like ` instanceof UnknownShuffleDeploymentDescriptor` in other processes. The SDD should be generated by `ShuffleMaster` by design, but the special `UnknownShuffleDeploymentDescriptor` is generated only in the case of `LocationType.Unknown` which is not via `ShuffleMaster`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11558) Translate "Ecosystem" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11558: --- Labels: pull-request-available (was: ) > Translate "Ecosystem" page into Chinese > --- > > Key: FLINK-11558 > URL: https://issues.apache.org/jira/browse/FLINK-11558 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Jark Wu >Assignee: Kaibo Zhou >Priority: Major > Labels: pull-request-available > > Translate "Ecosystem" page into Chinese. > The markdown file is located in: flink-web/ecosystem.zh.md > The url link is: https://flink.apache.org/zh/ecosystem.html > Please adjust the links in the page to Chinese pages when translating. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#discussion_r255814019 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -813,24 +815,27 @@ TaskDeploymentDescriptor createDeploymentDescriptor( boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment(); for (IntermediateResultPartition partition : resultPartitions.values()) { - List> consumers = partition.getConsumers(); - + int maxParallelism; if (consumers.isEmpty()) { //TODO this case only exists for test, currently there has to be exactly one consumer in real jobs! - producedPartitions.add(ResultPartitionDeploymentDescriptor.from( - partition, - KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, - lazyScheduling)); + maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; } else { Preconditions.checkState(1 == consumers.size(), - "Only one consumer supported in the current implementation! Found: " + consumers.size()); + "Only one consumer supported in the current implementation! Found: " + consumers.size()); List consumer = consumers.get(0); ExecutionJobVertex vertex = consumer.get(0).getTarget().getJobVertex(); - int maxParallelism = vertex.getMaxParallelism(); - producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, maxParallelism, lazyScheduling)); + maxParallelism = vertex.getMaxParallelism(); } + + PartitionShuffleDescriptor psd = PartitionShuffleDescriptor.from(targetSlot, executionId, partition, maxParallelism); + + producedPartitions.add(ResultPartitionDeploymentDescriptor.fromShuffleDescriptor(psd)); + getCurrentExecutionAttempt().cachePartitionShuffleDescriptor(partition.getIntermediateResult().getId(), psd); Review comment: From functional aspect, caching the `TaskDeploymentDescriptor` might also make sense. But I have other concerns: 1. The structure of TDD is complicated and would take more memory if caching completely, such as unnecessary fields `serializedJobInformation`, `serializedTaskInformation`,etc. 2. We might need adjust the current collection structure of `producedPartitions`, `inputGates` in TDD to map structure in order to find required PSD, SDD directly for other usages. 3. If replacing the current three descriptors caches, we might not need the class of `PartialInputChannelDeploymentDescriptor` any more if I understand correctly. But I wonder there exists such scenarios that during deploying consumer execution, only some input channel descriptors are unknown. During sending partition infos we only want to send these unknown infos, so how can we distinguish them from all the cached producer's TDD? In other words, the current cached `partialInputChannelDeploymentDescriptors` might be only a sub collection of all cached TDDs on producer side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on issue #5923: [FLINK-9253][network] make the maximum floating buffers count channel-type independent
sunjincheng121 commented on issue #5923: [FLINK-9253][network] make the maximum floating buffers count channel-type independent URL: https://github.com/apache/flink/pull/5923#issuecomment-462629185 Hi, @NicoK are you still following this PR? Recently, @rmetzger and I are planning the release of Flink 1.6.4. Do you want put this PR into release 1.6.4? Best, Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#discussion_r255814019 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -813,24 +815,27 @@ TaskDeploymentDescriptor createDeploymentDescriptor( boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment(); for (IntermediateResultPartition partition : resultPartitions.values()) { - List> consumers = partition.getConsumers(); - + int maxParallelism; if (consumers.isEmpty()) { //TODO this case only exists for test, currently there has to be exactly one consumer in real jobs! - producedPartitions.add(ResultPartitionDeploymentDescriptor.from( - partition, - KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, - lazyScheduling)); + maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; } else { Preconditions.checkState(1 == consumers.size(), - "Only one consumer supported in the current implementation! Found: " + consumers.size()); + "Only one consumer supported in the current implementation! Found: " + consumers.size()); List consumer = consumers.get(0); ExecutionJobVertex vertex = consumer.get(0).getTarget().getJobVertex(); - int maxParallelism = vertex.getMaxParallelism(); - producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, maxParallelism, lazyScheduling)); + maxParallelism = vertex.getMaxParallelism(); } + + PartitionShuffleDescriptor psd = PartitionShuffleDescriptor.from(targetSlot, executionId, partition, maxParallelism); + + producedPartitions.add(ResultPartitionDeploymentDescriptor.fromShuffleDescriptor(psd)); + getCurrentExecutionAttempt().cachePartitionShuffleDescriptor(partition.getIntermediateResult().getId(), psd); Review comment: From functional aspect, caching the `TaskDeploymentDescriptor` might also make sense. But I have other concerns: 1. The structure of TDD is complicated and would take more memory if caching completely, such as unnecessary fields `serializedJobInformation`, `serializedTaskInformation`,etc. 2. We might need adjust the current collection structure of `producedPartitions`, `inputGates` in TDD to map structure in order to find required PSD, SDD directly for other usages. 3. If replacing the current three descriptors caches, we might not need the class of `PartialInputChannelDeploymentDescriptor` any more if I understand correctly. But I wonder there exists such scenarios that during deploying consumer execution, only some input channel descriptors are unknown. During sending partition infos we only want to send these unknown infos when consumer deployment, so how can we distinguish them from all the cached producer's TDD? In other words, the current cached `partialInputChannelDeploymentDescriptors` might be only a sub collection of cached TDDs on producer side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11581) Add Chinese document contribution guideline
[ https://issues.apache.org/jira/browse/FLINK-11581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11581: --- Labels: pull-request-available (was: ) > Add Chinese document contribution guideline > --- > > Key: FLINK-11581 > URL: https://issues.apache.org/jira/browse/FLINK-11581 > Project: Flink > Issue Type: Sub-task > Components: Project Website >Reporter: Jark Wu >Assignee: Forward Xu >Priority: Major > Labels: pull-request-available > > Add a guideline to introduce how to contribute Chinese docs. For example, how > it works, how to add a new Chinese page. > Maybe we can add a section to > https://flink.apache.org/contribute-documentation.html . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11581) Add Chinese document contribution guideline
[ https://issues.apache.org/jira/browse/FLINK-11581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Forward Xu reassigned FLINK-11581: -- Assignee: Forward Xu > Add Chinese document contribution guideline > --- > > Key: FLINK-11581 > URL: https://issues.apache.org/jira/browse/FLINK-11581 > Project: Flink > Issue Type: Sub-task > Components: Project Website >Reporter: Jark Wu >Assignee: Forward Xu >Priority: Major > > Add a guideline to introduce how to contribute Chinese docs. For example, how > it works, how to add a new Chinese page. > Maybe we can add a section to > https://flink.apache.org/contribute-documentation.html . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11555) Translate "Contributing Code" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11555: --- Labels: pull-request-available (was: ) > Translate "Contributing Code" page into Chinese > --- > > Key: FLINK-11555 > URL: https://issues.apache.org/jira/browse/FLINK-11555 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Jark Wu >Assignee: Forward Xu >Priority: Major > Labels: pull-request-available > > Translate "Contributing Code" page into Chinese. > The markdown file is located in: flink-web/contribute-code.zh.md > The url link is: https://flink.apache.org/zh/contribute-code.html > Please adjust the links in the page to Chinese pages when translating. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11581) Add Chinese document contribution guideline
Jark Wu created FLINK-11581: --- Summary: Add Chinese document contribution guideline Key: FLINK-11581 URL: https://issues.apache.org/jira/browse/FLINK-11581 Project: Flink Issue Type: Sub-task Components: Project Website Reporter: Jark Wu Add a guideline to introduce how to contribute Chinese docs. For example, how it works, how to add a new Chinese page. Maybe we can add a section to https://flink.apache.org/contribute-documentation.html . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] liyafan82 opened a new pull request #7680: [FLINK-11421][Table API & SQL] Providing more compilation options for code-generated o…
liyafan82 opened a new pull request #7680: [FLINK-11421][Table API & SQL] Providing more compilation options for code-generated o… URL: https://github.com/apache/flink/pull/7680 …perators (changes for stream jobs) ## What is the purpose of the change This is to support JIRA [FLINK-11421] Providing more compilation options for code-generated operators ## Brief change log *The main changes include* - *Provide the JCA compiler to compile generated code by Java Compiler API.* - *Support specifying the compilation option from the configuration file (flink-conf.yaml).* - *The default compilation option remains the same, by Janino. So it does not affect existing jobs, unless configured specifically.* ## Verifying this change This change is already covered by existing tests, such as *(flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7680: [FLINK-11421][Table API & SQL] Providing more compilation options for code-generated o…
flinkbot commented on issue #7680: [FLINK-11421][Table API & SQL] Providing more compilation options for code-generated o… URL: https://github.com/apache/flink/pull/7680#issuecomment-462593057 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11501) Add a ratelimiting feature to the FlinkKafkaConsumer
[ https://issues.apache.org/jira/browse/FLINK-11501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11501: --- Labels: pull-request-available (was: ) > Add a ratelimiting feature to the FlinkKafkaConsumer > > > Key: FLINK-11501 > URL: https://issues.apache.org/jira/browse/FLINK-11501 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: pull-request-available > Attachments: RateLimiting-1.png, Ratelimiting-2.png > > > There are instances when a Flink job that reads from Kafka can read at a > significantly high throughput (particularly while processing a backlog) and > degrade the underlying Kafka cluster. > While Kafka quotas are perhaps the best way to enforce this ratelimiting, > there are cases where such a setup is not available or easily enabled. In > such a scenario, ratelimiting on the FlinkKafkaConsumer is useful feature. > The approach is essentially involves using Guava's > [RateLimiter|https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html] > to ratelimit the bytes read from Kafka (in the > [KafkaConsumerThread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java]) > More discussion here: > [https://lists.apache.org/thread.html/8140b759ba83f33a22d809887fd2d711f5ffe7069c888eb9b1142272@%3Cdev.flink.apache.org%3E] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version
wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version URL: https://github.com/apache/flink/pull/7599#issuecomment-462575544 Thanks @greghogan Would you pls help to merge this PR? It seems StefanRRichter is not available now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
flinkbot commented on issue #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#issuecomment-462574731 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] glaksh100 opened a new pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
glaksh100 opened a new pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679 ## What is the purpose of the change This pull request adds a ratelimiting feature to the Flink Kafka consumer. There are instances when a Flink job that reads from Kafka can read at a significantly high throughput (particularly while processing a backlog) and degrade the underlying Kafka cluster. While Kafka quotas are perhaps the best way to enforce this ratelimiting, there are cases where such a setup is not available or easily enabled. In such a scenario, ratelimiting on the FlinkKafkaConsumer is a useful feature. ## Brief change log - This feature is set by using a feature flag - `kafka.consumer.ratelimiting.enabled` - A`RateLimiterFactory` is used to configure and create a Guava [RateLimiter](https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html) with a desired rate. - The `consumer.poll()` part of the `run()` loop in the `KafkaConsumerThread` is modularized into a `getRecordsFromKafka()` method. - The rate is controlled by setting the bytes received from Kafka as the parameter to the `acquire()` call. ## Verifying this change This change added tests and can be verified as follows: - Added a `testRateLimiting()` test in the `KafkaConsumerThreadTest` class. - Manually verified the change using a test application and screenshots of results are added [here](https://issues.apache.org/jira/browse/FLINK-11501?focusedCommentId=16762965=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16762965). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers URL: https://github.com/apache/flink/pull/7356#discussion_r255752753 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -395,12 +405,20 @@ public void onContainersAllocated(List containers) { nodeManagerClient.startContainer(container, taskExecutorLaunchContext); } catch (Throwable t) { log.error("Could not start TaskManager in container {}.", container.getId(), t); - // release the failed container workerNodeMap.remove(resourceId); resourceManagerClient.releaseAssignedContainer(container.getId()); - // and ask for a new one - requestYarnContainerIfRequired(); + log.error("Could not start TaskManager in container {}.", container.getId(), t); + recordFailure(); + if (shouldRejectRequests()) { + rejectAllPendingSlotRequests(new MaximumFailedTaskManagerExceedingException( + String.format("Maximum number of failed container %d in interval %s" + + "is detected in Resource Manager", maximumFailureTaskExecutorPerInternal, + failureInterval.toString()), t)); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-9816) Support Netty configuration to enable an openSSL-based SslEngine
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9816: --- Issue Type: Sub-task (was: Improvement) Parent: FLINK-11579 > Support Netty configuration to enable an openSSL-based SslEngine > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. > This ticket is for adding the necessary parts to configure and setup an > arbitrary Netty-supported SslEngine. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11453) Support SliceStream with forwardable pane info
[ https://issues.apache.org/jira/browse/FLINK-11453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11453: --- Labels: pull-request-available (was: ) > Support SliceStream with forwardable pane info > --- > > Key: FLINK-11453 > URL: https://issues.apache.org/jira/browse/FLINK-11453 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > > Support slicing operation that produces slicing: > {code:java} > val slicedStream: SlicedStream = inputStream > .keyBy("key") > .sliceWindow(Time.seconds(5L)) // new “slice window” concept: to > combine >// tumble results based on discrete >// non-overlapping windows. > .aggregate(aggFunc) > {code} > {{SlicedStream}} will produce results that exposes current {{WindowOperator}} > internal state {{InternalAppendingState}}, which can be > later applied with {{WindowFunction}} separately in another operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot commented on issue #7678: [FLINK-11453][DataStreamAPI] Support SliceStream with forwardable pane info using slice assigner, operator and stream
flinkbot commented on issue #7678: [FLINK-11453][DataStreamAPI] Support SliceStream with forwardable pane info using slice assigner, operator and stream URL: https://github.com/apache/flink/pull/7678#issuecomment-462519667 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr opened a new pull request #7678: [FLINK-11453][DataStreamAPI] Support SliceStream with forwardable pane info using slice assigner, operator and stream
walterddr opened a new pull request #7678: [FLINK-11453][DataStreamAPI] Support SliceStream with forwardable pane info using slice assigner, operator and stream URL: https://github.com/apache/flink/pull/7678 **(The sections below can be removed for hotfixes of typos)** --> ## What is the purpose of the change This PR introduces a new `SlicedStream` abstract operation, which creates a resulting stream of the intermediate results buffered in the internal state of `WindowedOperator`. It creates a `Slice` data type as a result to contain all necessary information of a pane slice. With this API. further processing is possible for operations: ``` val slicedStream: slicedStream = inputStream .keyBy("key") .sliceWindow(Time.seconds(5L)) .aggregate(aggFunc) val resultStream = slicedStream .window(Time.seconds(5000L)) .aggregate(aggFunc) ``` Is possible to create much more efficient sliding window operation, where elements won't have to be duplicated into each window. ## Brief change log - Added `SliceAssigner` that assigns elements into zero or one window (a.k.a. the "slice") - Modified `KeyedStream` and `WindowedStream` API to incorporate the creation of `SlicedStream` - Added `SlicedStream` concept that can emit slicing results. - Created special operators `SliceOperator` and `IterableSliceOperator` to process the intermediate results. - Added in TumblingEvent/ProcessingTimeSliceAssigner as an example. ## Verifying this change - This change is already covered by multiple tests for backward compatibility - This change added tests and can be verified as follows: - Added integration tests for end-to-end processing for reduce, aggregation, and general apply - Added translation tests in scala for verifying `SlicedStream` API conversion chained with `KeyedStream`. - Added integration tests specifically tested serialization and deserialization to/from state snapshot. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not yet, await review This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11580) Provide a dynamically-linked netty-tc
Nico Kruber created FLINK-11580: --- Summary: Provide a dynamically-linked netty-tc Key: FLINK-11580 URL: https://issues.apache.org/jira/browse/FLINK-11580 Project: Flink Issue Type: Sub-task Components: flink-shaded.git, Network Reporter: Nico Kruber Assignee: Nico Kruber In order to have an openSSL-based SSL engine available, we need a shaded netty-tc version in the classpath which relies on openSSL libraries from the system it is running on. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11579) Support Netty SslEngine based on openSSL
Nico Kruber created FLINK-11579: --- Summary: Support Netty SslEngine based on openSSL Key: FLINK-11579 URL: https://issues.apache.org/jira/browse/FLINK-11579 Project: Flink Issue Type: Improvement Components: Network Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.8.0 Since a while now, Netty does not only support the JDK's SSLEngine but also implements one based on openSSL which, according to https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9816) Support Netty configuration to enable an openSSL-based SslEngine
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9816: --- Summary: Support Netty configuration to enable an openSSL-based SslEngine (was: Support Netty SslEngine based on openSSL) > Support Netty configuration to enable an openSSL-based SslEngine > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9816) Support Netty configuration to enable an openSSL-based SslEngine
[ https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-9816: --- Description: Since a while now, Netty does not only support the JDK's {{SSLEngine}} but also implements one based on openSSL which, according to https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly faster. We should add support for using that engine instead. This ticket is for adding the necessary parts to configure and setup an arbitrary Netty-supported SslEngine. was:Since a while now, Netty does not only support the JDK's {{SSLEngine}} but also implements one based on openSSL which, according to https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly faster. We should add support for using that engine instead. > Support Netty configuration to enable an openSSL-based SslEngine > > > Key: FLINK-9816 > URL: https://issues.apache.org/jira/browse/FLINK-9816 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Since a while now, Netty does not only support the JDK's {{SSLEngine}} but > also implements one based on openSSL which, according to > https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly > faster. We should add support for using that engine instead. > This ticket is for adding the necessary parts to configure and setup an > arbitrary Netty-supported SslEngine. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot edited a comment on issue #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
flinkbot edited a comment on issue #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#issuecomment-459738324 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#discussion_r255616682 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ## @@ -52,30 +52,38 @@ /** The ID of the partition the input channel is going to consume. */ private final ResultPartitionID consumedPartitionId; - /** The location of the partition the input channel is going to consume. */ - private final ResultPartitionLocation consumedPartitionLocation; + /** The location type of the partition the input channel is going to consume. */ + private final LocationType locationType; + + /** The connection to use to request the remote partition. */ + private final Optional connectionId; Review comment: I thought we would just have here `ShuffleDeploymentDescriptor` instead of `ConnectionID`. SDD also contains `ConnectionID`. If `LocationType.Unknown` is unknown, SDD field could be just special singleton implementation of `ShuffleDeploymentDescriptor` -> `UnknownShuffleDeploymentDescriptor`, or is it coming later? Also, in `ResultPartitionDeploymentDescriptor`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#discussion_r255629226 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ShuffleDeploymentDescriptor.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.runtime.io.network.ConnectionID; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Deployment descriptor for shuffle specific information. + */ +public class ShuffleDeploymentDescriptor implements Serializable { Review comment: I think eventually it needs to be an interface, probably an empty one. This one could stay an implementation for the default shuffle master. Also, special `UnknownShuffleDeploymentDescriptor` could extend the interface. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#discussion_r255611695 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -813,24 +815,27 @@ TaskDeploymentDescriptor createDeploymentDescriptor( boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment(); for (IntermediateResultPartition partition : resultPartitions.values()) { - List> consumers = partition.getConsumers(); - + int maxParallelism; if (consumers.isEmpty()) { //TODO this case only exists for test, currently there has to be exactly one consumer in real jobs! - producedPartitions.add(ResultPartitionDeploymentDescriptor.from( - partition, - KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, - lazyScheduling)); + maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; } else { Preconditions.checkState(1 == consumers.size(), - "Only one consumer supported in the current implementation! Found: " + consumers.size()); + "Only one consumer supported in the current implementation! Found: " + consumers.size()); List consumer = consumers.get(0); ExecutionJobVertex vertex = consumer.get(0).getTarget().getJobVertex(); - int maxParallelism = vertex.getMaxParallelism(); - producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, maxParallelism, lazyScheduling)); + maxParallelism = vertex.getMaxParallelism(); } + + PartitionShuffleDescriptor psd = PartitionShuffleDescriptor.from(targetSlot, executionId, partition, maxParallelism); + + producedPartitions.add(ResultPartitionDeploymentDescriptor.fromShuffleDescriptor(psd)); + getCurrentExecutionAttempt().cachePartitionShuffleDescriptor(partition.getIntermediateResult().getId(), psd); Review comment: Would it work if the complete `TaskDeploymentDescriptor` would be just cached as volatile field in `Execution`? Maybe we would not need any of three descriptors caches, what do think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-10721) kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute in FlinkKafkaConsumerBase run method
[ https://issues.apache.org/jira/browse/FLINK-10721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-10721. - Resolution: Fixed > kafkaFetcher runFetchLoop throw exception will cause follow-up code not > execute in FlinkKafkaConsumerBase run method > - > > Key: FLINK-10721 > URL: https://issues.apache.org/jira/browse/FLINK-10721 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.6.2 >Reporter: zhaoshijie >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.4, 1.7.2, 1.8.0 > > > In FlinkKafkaConsumerBase run method on line 721(master branch), if > kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw > exception then finally execute cancel method, cancel method will execute > kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute > handover.close, then result in handover.pollNext throw ClosedException),then > next code will not execute,especially discoveryLoopError not be throwed,so, > real culprit exception will be Swallowed. > failed log like this: > {code:java} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > at > org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695) > at java.lang.Thread.run(Thread.java:745) > {code} > Shoud we modify it as follows? > {code:java} > try { > kafkaFetcher.runFetchLoop(); > } catch (Exception e) { > // if discoveryLoopErrorRef not null ,we should > throw real culprit exception > if (discoveryLoopErrorRef.get() != null){ > throw new > RuntimeException(discoveryLoopErrorRef.get()); > } else { > throw e; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10721) kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute in FlinkKafkaConsumerBase run method
[ https://issues.apache.org/jira/browse/FLINK-10721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765226#comment-16765226 ] Tzu-Li (Gordon) Tai commented on FLINK-10721: - I think this is indirectly fixed by the life cycle rework of the FlinkKafkaConsumerBase in FLINK-10774. Will mark this as resolved for now, please re-open if you disagree. > kafkaFetcher runFetchLoop throw exception will cause follow-up code not > execute in FlinkKafkaConsumerBase run method > - > > Key: FLINK-10721 > URL: https://issues.apache.org/jira/browse/FLINK-10721 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.6.2 >Reporter: zhaoshijie >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.4, 1.7.2, 1.8.0 > > > In FlinkKafkaConsumerBase run method on line 721(master branch), if > kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw > exception then finally execute cancel method, cancel method will execute > kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute > handover.close, then result in handover.pollNext throw ClosedException),then > next code will not execute,especially discoveryLoopError not be throwed,so, > real culprit exception will be Swallowed. > failed log like this: > {code:java} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > at > org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695) > at java.lang.Thread.run(Thread.java:745) > {code} > Shoud we modify it as follows? > {code:java} > try { > kafkaFetcher.runFetchLoop(); > } catch (Exception e) { > // if discoveryLoopErrorRef not null ,we should > throw real culprit exception > if (discoveryLoopErrorRef.get() != null){ > throw new > RuntimeException(discoveryLoopErrorRef.get()); > } else { > throw e; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #7576: [FLINK-11046][elasticsearch] Fix ElasticSearch6Connector thread blocked when index failed with retry
asfgit closed pull request #7576: [FLINK-11046][elasticsearch] Fix ElasticSearch6Connector thread blocked when index failed with retry URL: https://github.com/apache/flink/pull/7576 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
[ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai closed FLINK-11046. --- Resolution: Fixed Merged. 1.8.0: 3aa92af9a68015f240c6dcc46313202e78ea5883 1.7.2: 2f522271abf03c5584612076b549c98d76a07f0f > ElasticSearch6Connector cause thread blocked when index failed with retry > - > > Key: FLINK-11046 > URL: https://issues.apache.org/jira/browse/FLINK-11046 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.6.2 >Reporter: luoguohao >Assignee: xueyu >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > When i'm using es6 sink to index into es, bulk process with some exception > catched, and i trying to reindex the document with the call > `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` > method, but things goes incorrect. The call thread stuck there, and with the > thread dump, i saw the `bulkprocessor` object was locked by other thread. > {code:java} > public interface ActionRequestFailureHandler extends Serializable { > void onFailure(ActionRequest action, Throwable failure, int restStatusCode, > RequestIndexer indexer) throws Throwable; > } > {code} > After i read the code implemented in the `indexer.add(action)`, i find that > `synchronized` is needed on each add operation. > {code:java} > private synchronized void internalAdd(DocWriteRequest request, @Nullable > Object payload) { > ensureOpen(); > bulkRequest.add(request, payload); > executeIfNeeded(); > } > {code} > And, at i also noticed that `bulkprocessor` object would also locked in the > bulk process thread. > the bulk process operation is in the following code: > {code:java} > public void execute(BulkRequest bulkRequest, long executionId) { > Runnable toRelease = () -> {}; > boolean bulkRequestSetupSuccessful = false; > try { > listener.beforeBulk(executionId, bulkRequest); > semaphore.acquire(); > toRelease = semaphore::release; > CountDownLatch latch = new CountDownLatch(1); > retry.withBackoff(consumer, bulkRequest, new > ActionListener() { > @Override > public void onResponse(BulkResponse response) { > try { > listener.afterBulk(executionId, bulkRequest, response); > } finally { > semaphore.release(); > latch.countDown(); > } > } > @Override > public void onFailure(Exception e) { > try { > listener.afterBulk(executionId, bulkRequest, e); > } finally { > semaphore.release(); > latch.countDown(); > } > } > }, Settings.EMPTY); > bulkRequestSetupSuccessful = true; >if (concurrentRequests == 0) { >latch.await(); > } > } catch (InterruptedException e) { > Thread.currentThread().interrupt(); > logger.info(() -> new ParameterizedMessage("Bulk request {} has been > cancelled.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } catch (Exception e) { > logger.warn(() -> new ParameterizedMessage("Failed to execute bulk > request {}.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } finally { > if (bulkRequestSetupSuccessful == false) { // if we fail on > client.bulk() release the semaphore > toRelease.run(); > } > } > } > {code} > As the read line i marked above, i think, that's the reason why the retry > operation thread was block, because the the bulk process thread never release > the lock on `bulkprocessor`. and, i also trying to figure out why the field > `concurrentRequests` was set to zero. And i saw the the initialize for > bulkprocessor in class `ElasticsearchSinkBase`: > {code:java} > protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { > ... > BulkProcessor.Builder bulkProcessorBuilder = > callBridge.createBulkProcessorBuilder(client, listener); > // This makes flush() blocking > bulkProcessorBuilder.setConcurrentRequests(0); > > ... > return bulkProcessorBuilder.build(); > } > {code} > this field value was set to zero explicitly. So, all things seems to make > sense, but i still wonder why the retry operation is not in the same thread > as the bulk process execution, after i read the code, `bulkAsync` method > might be the last puzzle. > {code:java} > @Override > public BulkProcessor.Builder
[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676#discussion_r255613841 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -1430,6 +1430,120 @@ public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception } } + @Nonnull + private TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway() { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()); + return testingResourceManagerGateway; + } + + /** +* Tests that the job execution is failed if the TaskExecutor disconnects from the +* JobMaster. +*/ + @Test + public void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception { + runJobFailureWhenTaskExecutorTerminatesTest( + () -> heartbeatServices, + (localTaskManagerLocation, jobMasterGateway) -> jobMasterGateway.disconnectTaskManager( + localTaskManagerLocation.getResourceID(), + new FlinkException("Test disconnectTaskManager exception.")), + (jobMasterGateway, resourceID) -> ignored -> {}); + } + + @Test + public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception { + final AtomicBoolean respondToHeartbeats = new AtomicBoolean(true); + runJobFailureWhenTaskExecutorTerminatesTest( + () -> fastHeartbeatServices, + (localTaskManagerLocation, jobMasterGateway) -> respondToHeartbeats.set(false), + (jobMasterGateway, taskManagerResourceId) -> resourceId -> { + if (respondToHeartbeats.get()) { + jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new AccumulatorReport(Collections.emptyList())); + } + } + ); + } + + private void runJobFailureWhenTaskExecutorTerminatesTest( + Supplier heartbeatSupplier, + BiConsumer jobReachedRunningState, + BiFunction> heartbeatConsumerFunction) throws Exception { + final JobGraph jobGraph = createSingleVertexJobGraph(); + final TestingOnCompletionActions onCompletionActions = new TestingOnCompletionActions(); + final JobMaster jobMaster = createJobMaster( + new Configuration(), + jobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build(), + heartbeatSupplier.get(), + onCompletionActions); + + createAndRegisterTestingResourceManagerGateway(); + + try { + jobMaster.start(jobMasterId).get(); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final CompletableFuture taskDeploymentFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> { + taskDeploymentFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setHeartbeatJobManagerConsumer(heartbeatConsumerFunction.apply(jobMasterGateway, taskManagerLocation.getResourceID())) + .createTestingTaskExecutorGateway(); + rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); + + jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get(); + final SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.UNKNOWN); + final Collection slotOffers = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(),
[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676#discussion_r255593757 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ## @@ -945,4 +954,12 @@ public void onFatalError(Throwable exception) { closeAsync(); } } + + private class TerminatingFatalErrorHandlerFactory { + + @GuardedBy("lock") + private TerminatingFatalErrorHandler create() { + return new TerminatingFatalErrorHandler(taskManagers.size()); Review comment: This deserves a comment. We pass `taskmanagers.size()` but the constructor expects an index. One might think that this throws `ArrayIndexOutOfBoundsException`. ``` synchronized (lock) { taskManagers.get(index).shutDown(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676#discussion_r255589778 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ## @@ -696,34 +686,38 @@ protected ResourceManagerRunner startResourceManager( return resourceManagerRunner; } - protected TaskExecutor[] startTaskManagers( - Configuration configuration, - HighAvailabilityServices haServices, - HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, - BlobCacheService blobCacheService, - int numTaskManagers, - RpcServiceFactory rpcServiceFactory) throws Exception { + @GuardedBy("lock") + private void startTaskManagers() throws Exception { Review comment: Should be moved to ``` // // Internal methods // ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676#discussion_r255600139 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java ## @@ -66,6 +67,17 @@ public TestingMiniCluster(TestingMiniClusterConfiguration miniClusterConfigurati return super.getDispatcherResourceManagerComponents(); } + @Nonnull + @Override + public CompletableFuture terminateTaskExecutor(int index) { + return super.terminateTaskExecutor(index); + } + + @Override + public void startTaskExecutor(boolean localCommunication) throws Exception { Review comment: Doesn't look like user friendly API. Shouldn't `MiniCluster` set this flag depending on the configuration? Wouldn't it be enough to expose a signature such as `public void startTaskExecutor()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676#discussion_r255600846 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Integration tests for the {@link TaskExecutor}. + */ +public class TaskExecutorITCase extends TestLogger { + + private static final Duration TESTING_TIMEOUT = Duration.ofMinutes(2L); + private static final int NUM_TMS = 2; + private static final int SLOTS_PER_TM = 2; + private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM; + + private TestingMiniCluster miniCluster; + + @Before + public void setup() throws Exception { + miniCluster = new TestingMiniCluster( + new TestingMiniClusterConfiguration.Builder() + .setNumTaskManagers(NUM_TMS) + .setNumSlotsPerTaskManager(SLOTS_PER_TM) + .build(), + null); + + miniCluster.start(); + } + + @After + public void teardown() throws Exception { + if (miniCluster != null) { + miniCluster.close(); + } + } + + /** +* Tests that a job will be re-executed if a new TaskExecutor joins the cluster. +*/ + @Test + public void testNewTaskExecutorJoinsCluster() throws Exception { + + final Deadline deadline = Deadline.fromNow(TESTING_TIMEOUT); Review comment: Declaration of `deadline` is too early. Should be closer to `waitUntilCondition` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676#discussion_r255591538 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ## @@ -738,6 +732,26 @@ protected ResourceManagerRunner startResourceManager( // Internal methods // + @GuardedBy("lock") + private Collection> terminateTaskExecutor() { Review comment: `terminateTaskExecutors`? (plural) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676#discussion_r255594841 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Integration tests for the {@link TaskExecutor}. + */ +public class TaskExecutorITCase extends TestLogger { + + private static final Duration TESTING_TIMEOUT = Duration.ofMinutes(2L); + private static final int NUM_TMS = 2; + private static final int SLOTS_PER_TM = 2; + private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM; + + private TestingMiniCluster miniCluster; + + @Before + public void setup() throws Exception { + miniCluster = new TestingMiniCluster( + new TestingMiniClusterConfiguration.Builder() + .setNumTaskManagers(NUM_TMS) + .setNumSlotsPerTaskManager(SLOTS_PER_TM) + .build(), + null); + + miniCluster.start(); + } + + @After + public void teardown() throws Exception { + if (miniCluster != null) { + miniCluster.close(); + } + } + + /** +* Tests that a job will be re-executed if a new TaskExecutor joins the cluster. +*/ + @Test + public void testNewTaskExecutorJoinsCluster() throws Exception { + + final Deadline deadline = Deadline.fromNow(TESTING_TIMEOUT); + + final JobGraph jobGraph = createJobGraph(PARALLELISM); + + miniCluster.submitJob(jobGraph).get(); + + final CompletableFuture jobResultFuture = miniCluster.requestJobResult(jobGraph.getJobID()); + + assertThat(jobResultFuture.isDone(), is(false)); + + CommonTestUtils.waitUntilCondition( + jobIsRunning(() -> miniCluster.getExecutionGraph(jobGraph.getJobID())), + deadline, + 20L); + + // kill one TaskExecutor which should fail the job execution + miniCluster.terminateTaskExecutor(0); + + final JobResult jobResult = jobResultFuture.get(); + + assertThat(jobResult.isSuccess(), is(false)); + + miniCluster.startTaskExecutor(false); + + BlockingOperator.unblock(); + + miniCluster.submitJob(jobGraph).get(); + + miniCluster.requestJobResult(jobGraph.getJobID()).get(); + } + + private SupplierWithException jobIsRunning(Supplier> executionGraphFutureSupplier) { + final Predicate allExecutionsRunning =
[jira] [Assigned] (FLINK-11577) Check and port StackTraceSampleCoordinatorITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reassigned FLINK-11577: Assignee: (was: Gary Yao) > Check and port StackTraceSampleCoordinatorITCase to new code base if necessary > -- > > Key: FLINK-11577 > URL: https://issues.apache.org/jira/browse/FLINK-11577 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Gary Yao >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Check and port {{StackTraceSampleCoordinatorITCase}} to new code base if > necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #7637: [FLINK-11511] Remove legacy class JobAttachmentClientActor
asfgit closed pull request #7637: [FLINK-11511] Remove legacy class JobAttachmentClientActor URL: https://github.com/apache/flink/pull/7637 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11511) Remove legacy class JobAttachmentClientActor
[ https://issues.apache.org/jira/browse/FLINK-11511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao closed FLINK-11511. Resolution: Fixed Fixed via ef2edd09d32b988a5ac0209787d28e7a48b79ddb > Remove legacy class JobAttachmentClientActor > > > Key: FLINK-11511 > URL: https://issues.apache.org/jira/browse/FLINK-11511 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Remove legacy class JobAttachmentClientActor -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] rehevkor5 commented on a change in pull request #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions
rehevkor5 commented on a change in pull request #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions URL: https://github.com/apache/flink/pull/7672#discussion_r255585813 ## File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java ## @@ -811,4 +818,63 @@ protected long getCurrentTimeMillis() { Assert.assertTrue("idle, no watermark", watermarks.isEmpty()); } + @Test + public void testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown() throws Exception { + final String stream = "fakeStream"; + + Map>> streamsToShardQueues = new HashMap<>(); + final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(10); + queue.put("item1"); + streamsToShardQueues.put(stream, Collections.singletonList(queue)); + + final AlwaysThrowsDeserializationSchema deserializationSchema = new AlwaysThrowsDeserializationSchema(); + final KinesisProxyInterface fakeKinesis = + FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamsToShardQueues); + + final TestableKinesisDataFetcherForShardConsumerException fetcher = new TestableKinesisDataFetcherForShardConsumerException<>( + Collections.singletonList(stream), + new TestSourceContext<>(), + TestUtils.getStandardProperties(), + new KinesisDeserializationSchemaWrapper<>(deserializationSchema), + 10, + 2, + new AtomicReference<>(), + new LinkedList<>(), + new HashMap<>(), + fakeKinesis); + + final DummyFlinkKinesisConsumer consumer = new DummyFlinkKinesisConsumer<>( Review comment: My IDE automatically marks things `final` wherever possible, to prevent writing accidental mutable code, but I will happily remove them. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-462391160 Thanks @EAlexRojas for the investigation. I am/was also in the process of discovering/fixing the same problems that you have mentioned and writing the tests for that. So far I have rewritten & fixed the tests provided by @tvielgouarin and encountered the same errors. TLDR; I'm thinking how to solve this situation, but I'm afraid it won't make it to 1.8... Full story: Regarding the 0.11 vs 2.0 dependency, having both of them in the same class path won't work because of the dependency convergence, but it doesn't have to. Committing/aborting 0.11 transactions with 2.0 connector should work fine, so that shouldn't be an issue. Real problem is that `FlinkKafkaProducer` and `FlinkKafkaProducer011` have different names and they defined static classes ` NextTransactionalIdHint`, `KafkaTransactionState` and `KafkaTransactionContext` inside the parent classes. This is causing incompatibility problems since for example `FlinkKafkaProducer011.KafkaTransactionState` and `FlinkKafkaProducer.KafkaTransactionState` are treated as completely incompatible classes, despite being identical. It can probably be solved by: 1. custom serialization logic, like keeping a fake/dummy `FlinkKafkaProducer011.XXXSerializer.XXXSerializerSnapshot` classes in the universal connector, as entry points for the deserialization 2. Add a "force skip class compatibility check" flag to the current serialization stack. After all serialized binary data are exactly the same in all of those cases. This is work in progress by @tzulitai, this might happen in time for 1.8 release. 3. Add a more powerful state migration function, that would be able to change type of a field/class. This is also on our road map, but won't happen in 1.8. Either way, unfortunately I'm away for next two weeks and I can not solve this issue before 1.8 feature freeze. This fix will have to wait for 1.9 release. I have implemented working regression tests for state compatibility between Flink versions: https://github.com/apache/flink/pull/7677 Test for migration from 0.11 to universal connector is also easy to implement: https://github.com/pnowojski/flink/tree/kafka-migration-0.11-to-universal-not-working But I didn't have time to make it work (as described above). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7677: [FLINK-11249][kafka] Add migration tests for FlinkKafkaProdcuer and FlinkKafkaProducer011
flinkbot commented on issue #7677: [FLINK-11249][kafka] Add migration tests for FlinkKafkaProdcuer and FlinkKafkaProducer011 URL: https://github.com/apache/flink/pull/7677#issuecomment-462389051 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski opened a new pull request #7677: [FLINK-11249][kafka] Add migration tests for FlinkKafkaProdcuer and FlinkKafkaProducer011
pnowojski opened a new pull request #7677: [FLINK-11249][kafka] Add migration tests for FlinkKafkaProdcuer and FlinkKafkaProducer011 URL: https://github.com/apache/flink/pull/7677 This PR doesn't solve the problem of migrating from 0.11 to universal connector, it just adds regression tests to make sure that those two connectors' producers have state compatibility with previous Flink releases. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (**yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base
flinkbot edited a comment on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676#issuecomment-462382064 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @GJL [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @GJL [committer] * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base
GJL commented on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676#issuecomment-462387760 @flinkbot approve description @flinkbot approve consensus This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base
flinkbot commented on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676#issuecomment-462382064 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11364) Check and port TaskManagerFailsITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11364: --- Labels: pull-request-available (was: ) > Check and port TaskManagerFailsITCase to new code base if necessary > --- > > Key: FLINK-11364 > URL: https://issues.apache.org/jira/browse/FLINK-11364 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > > Check and port {{TaskManagerFailsITCase}} to new code base if necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann opened a new pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base
tillrohrmann opened a new pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base URL: https://github.com/apache/flink/pull/7676 ## What is the purpose of the change Port `TaskManagerFailsITCase` to new code base. ## Brief change log - "detect a failing task manager" --> JobMaster#testHeartbeatTimeoutWithTaskManager - "handle gracefully failing task manager" --> JobMasterTest#testJobFailureWhenGracefulTaskExecutorTermination - "handle hard failing task manager" --> JobMasterTest#testJobFailureWhenTaskExecutorHeartbeatTimeout - "go into a clean state in case of a TaskManager failure" --> TaskExecutorITCase#testNewTaskExecutorJoinsCluster ## Verifying this change - Run ported tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11578) Check and port BackPressureStatsTrackerImplITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-11578: - Description: Check and port {{BackPressureStatsTrackerImplITCase}} to new code base if necessary. (was: Check and port {{StackTraceSampleCoordinatorITCase}} to new code base if necessary.) > Check and port BackPressureStatsTrackerImplITCase to new code base if > necessary > --- > > Key: FLINK-11578 > URL: https://issues.apache.org/jira/browse/FLINK-11578 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Check and port {{BackPressureStatsTrackerImplITCase}} to new code base if > necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11577) Check and port StackTraceSampleCoordinatorITCase to new code base if necessary
Gary Yao created FLINK-11577: Summary: Check and port StackTraceSampleCoordinatorITCase to new code base if necessary Key: FLINK-11577 URL: https://issues.apache.org/jira/browse/FLINK-11577 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Gary Yao Assignee: Gary Yao Fix For: 1.8.0 Check and port {{ClusterShutdownITCase}} to new code base if necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11578) Check and port BackPressureStatsTrackerImplITCase to new code base if necessary
Gary Yao created FLINK-11578: Summary: Check and port BackPressureStatsTrackerImplITCase to new code base if necessary Key: FLINK-11578 URL: https://issues.apache.org/jira/browse/FLINK-11578 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Gary Yao Assignee: Gary Yao Fix For: 1.8.0 Check and port {{StackTraceSampleCoordinatorITCase}} to new code base if necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11577) Check and port StackTraceSampleCoordinatorITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-11577: - Description: Check and port {{StackTraceSampleCoordinatorITCase}} to new code base if necessary. (was: Check and port {{ClusterShutdownITCase}} to new code base if necessary.) > Check and port StackTraceSampleCoordinatorITCase to new code base if necessary > -- > > Key: FLINK-11577 > URL: https://issues.apache.org/jira/browse/FLINK-11577 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Check and port {{StackTraceSampleCoordinatorITCase}} to new code base if > necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r255547663 ## File path: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; + +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +import static org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY; + +/** + * DataStreamAllroundTestProgram on Minicluster. + */ +public class AllroundMiniClusterTest { Review comment: No, the intention is for manual debugging, it can be ignored and go to test nevertheless. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11511) Remove legacy class JobAttachmentClientActor
[ https://issues.apache.org/jira/browse/FLINK-11511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-11511: - Description: Remove legacy class JobAttachmentClientActor > Remove legacy class JobAttachmentClientActor > > > Key: FLINK-11511 > URL: https://issues.apache.org/jira/browse/FLINK-11511 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Remove legacy class JobAttachmentClientActor -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot edited a comment on issue #7637: [FLINK-11511] Remove legacy class JobAttachmentClientActor
flinkbot edited a comment on issue #7637: [FLINK-11511] Remove legacy class JobAttachmentClientActor URL: https://github.com/apache/flink/pull/7637#issuecomment-459738301 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @GJL [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @GJL [committer] * ❔ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @GJL [committer] * ✅ 5. Overall code [quality] is good. - Approved by @GJL [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7637: [FLINK-11511] Remove legacy class JobAttachmentClientActor
GJL commented on issue #7637: [FLINK-11511] Remove legacy class JobAttachmentClientActor URL: https://github.com/apache/flink/pull/7637#issuecomment-462359647 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7637: [FLINK-11511] Remove legacy class JobAttachmentClientActor
GJL commented on issue #7637: [FLINK-11511] Remove legacy class JobAttachmentClientActor URL: https://github.com/apache/flink/pull/7637#issuecomment-462359456 LGTM, merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r255543476 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public enum PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { Review comment: There was a previous unit test since 1.7 for the local recovery fix, it was not yet wired to the new code and did in fact use an outdated method. I fixed the wiring, slightly extended it and also use a subset of the test for `LocationPreferenceSlotSelection` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11508) Remove invalid test AkkaJobManagerRetrieverTest
[ https://issues.apache.org/jira/browse/FLINK-11508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao closed FLINK-11508. Resolution: Fixed > Remove invalid test AkkaJobManagerRetrieverTest > --- > > Key: FLINK-11508 > URL: https://issues.apache.org/jira/browse/FLINK-11508 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Remove invalid test AkkaJobManagerRetrieverTest -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11508) Remove invalid test AkkaJobManagerRetrieverTest
[ https://issues.apache.org/jira/browse/FLINK-11508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-11508: - Description: Remove invalid test AkkaJobManagerRetrieverTest > Remove invalid test AkkaJobManagerRetrieverTest > --- > > Key: FLINK-11508 > URL: https://issues.apache.org/jira/browse/FLINK-11508 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Remove invalid test AkkaJobManagerRetrieverTest -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-11508) Remove invalid test AkkaJobManagerRetrieverTest
[ https://issues.apache.org/jira/browse/FLINK-11508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reopened FLINK-11508: -- > Remove invalid test AkkaJobManagerRetrieverTest > --- > > Key: FLINK-11508 > URL: https://issues.apache.org/jira/browse/FLINK-11508 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11507) Remove invalid test JobClientActorTest
[ https://issues.apache.org/jira/browse/FLINK-11507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao closed FLINK-11507. Resolution: Fixed Fixed via e6b5eda7359cd1fd2c58f5e33b844c4ea061294a > Remove invalid test JobClientActorTest > -- > > Key: FLINK-11507 > URL: https://issues.apache.org/jira/browse/FLINK-11507 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > As discussed in FLINK-11146, {{JobClient}} and {{JobClientActor}} is a static > helper class as the bridge between the non-actor code and the {{JobManager}}. > In FLIP-6 codebase we will finally converge to a {{NewClusterClient}}. Thus > this test is invalid. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #7633: [FLINK-11508] Remove invalid test AkkaJobManagerRetrieverTest
asfgit closed pull request #7633: [FLINK-11508] Remove invalid test AkkaJobManagerRetrieverTest URL: https://github.com/apache/flink/pull/7633 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #7635: [FLINK-11507] Remove invalid test JobClientActorTest
asfgit closed pull request #7635: [FLINK-11507] Remove invalid test JobClientActorTest URL: https://github.com/apache/flink/pull/7635 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11508) Remove invalid test AkkaJobManagerRetrieverTest
[ https://issues.apache.org/jira/browse/FLINK-11508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao closed FLINK-11508. Resolution: Fixed Fixed via 6ae2ed40830bd3bf4c94b0038c373f25c5cdba7b > Remove invalid test AkkaJobManagerRetrieverTest > --- > > Key: FLINK-11508 > URL: https://issues.apache.org/jira/browse/FLINK-11508 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10834) TableAPI flatten() calculated value error
[ https://issues.apache.org/jira/browse/FLINK-10834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-10834: Fix Version/s: (was: 1.7.2) 1.7.3 > TableAPI flatten() calculated value error > - > > Key: FLINK-10834 > URL: https://issues.apache.org/jira/browse/FLINK-10834 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Fix For: 1.7.3 > > > We have a UDF as follows: > {code:java} > object FuncRow extends ScalarFunction { > def eval(v: Int): Row = { > val version = "" + new Random().nextInt() > val row = new Row(3) > row.setField(0, version) > row.setField(1, version) > row.setField(2, version) > row > } > override def isDeterministic: Boolean = false > override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = > Types.ROW(Types.STRING, Types.STRING, Types.STRING) > } > {code} > Do the following Query: > {code:sql} > val data = new mutable.MutableList[(Int, Long, String)] > data.+=((1, 1L, "Hi")) > val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b,'c) > .select(FuncRow('a).flatten()).as('v1, 'v2, 'v3) > {code} > The result is : -1189206469,-151367792,1988676906 > The result expected by the user should be: v1==v2==v3 . > It looks the real reason is that there is no result of the reuse in codegen. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11260) Bump Janino compiler dependency
[ https://issues.apache.org/jira/browse/FLINK-11260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-11260: Fix Version/s: (was: 1.7.2) 1.7.3 > Bump Janino compiler dependency > --- > > Key: FLINK-11260 > URL: https://issues.apache.org/jira/browse/FLINK-11260 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.7.1 >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3 > > Time Spent: 10m > Remaining Estimate: 0h > > Bump the Janino dependency: > http://janino-compiler.github.io/janino/changelog.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9228) log details about task fail/task manager is shutting down
[ https://issues.apache.org/jira/browse/FLINK-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9228: --- Fix Version/s: (was: 1.7.2) 1.7.3 > log details about task fail/task manager is shutting down > - > > Key: FLINK-9228 > URL: https://issues.apache.org/jira/browse/FLINK-9228 > Project: Flink > Issue Type: Improvement > Components: Logging >Affects Versions: 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.6.4, 1.7.3, 1.8.0 > > > condition: > flink version:1.4.2 > jdk version:1.8.0.20 > linux version:3.10.0 > problem description: > one of my task manager is out of the cluster and I checked its log found > something below: > 2018-04-19 22:34:47,441 INFO org.apache.flink.runtime.taskmanager.Task > > - Attempting to fail task externally Process (115/120) > (19d0b0ce1ef3b8023b37bdfda643ef44). > 2018-04-19 22:34:47,441 INFO org.apache.flink.runtime.taskmanager.Task > > - Process (115/120) (19d0b0ce1ef3b8023b37bdfda643ef44) switched from RUNNING > to FAILED. > java.lang.Exception: TaskManager is shutting down. > at > org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:220) > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager.scala:121) > > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) > > at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:374) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > suggestion: > # short term suggestion: > ## log reasons why task tail?maybe received some event from job > manager/can't connect to job manager? operator exception? the more claritify > the better > ## log reasons why task manager is shutting down? received some event from > job manager/can't connect to job manager? operator exception can't be > recovery? > # long term suggestion: > ## define the state machine of flink node clearly. if nothing happens, the > node should stay what it used to be, which means if it is processing events, > if nothing happens, it should still processing events.or in other words, if > its state changes from processing event to cancel, then event happens. > ## define the events which can cause node state changed clearly. like use > cancel, operator exception, heart beat timeout etc > ## log the state change and event which cause state chaged clearly in logs > ## show event details(time, node, event, state changed etc) in webui -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10875) Add `toTableWithTimestamp` method in `DataStreamConversions`
[ https://issues.apache.org/jira/browse/FLINK-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-10875: Fix Version/s: (was: 1.7.2) 1.7.3 > Add `toTableWithTimestamp` method in `DataStreamConversions` > > > Key: FLINK-10875 > URL: https://issues.apache.org/jira/browse/FLINK-10875 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Minor > Fix For: 1.7.3 > > > Currently we convert a `DataStream` to a `Table` by > `DataStreamConversions#toTable`, e.g.: > {code:java} > // Without TimeAttribute > ... > val stream = env.fromCollection(...) > val tab = stream.toTable(tEnv, 'a, 'b, 'c) > val result = tab.select('a, 'b) > > // With TimeAttribute > ... > val stream = env.fromCollection(...).assignTimestampsAndWatermarks(...) > val tab = stream.toTable(tEnv, 'a, 'b, 'c, 'ts.rowtime) > val result = tab.window(Session withGap 5.milli on 'ts as 'w) > ...{code} > I think the fieldNames parameter in the `toTable` method is reasonable in the > conversion without the time attribute, because the fieldNames will actually > correspond to the fields of the physical table, but when applied to the > conversion with the time attribute, the time attribute column is silently > added to the table. This feeling is very Magical, so I recommend adding a > method that allows the user to display the time attribute added to the > physical table: `toTableWithTimestamp`, which is automatically named to the > time attribute column named by user input and TimeCharacteristic, eg: > {code:java} > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > ... > val table = stream.toTableWithTimestamp(tEnv, 'count, 'size, 'name, 'ts) > .window(Tumble over 2.rows on 'ts as 'w) > ... > {code} > In the example above the flink will mark `ts` ad a `RowtimeAttribute`. > What do you think ? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11259) Bump Zookeeper dependency to 3.4.13
[ https://issues.apache.org/jira/browse/FLINK-11259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-11259: Fix Version/s: (was: 1.7.2) 1.7.3 > Bump Zookeeper dependency to 3.4.13 > --- > > Key: FLINK-11259 > URL: https://issues.apache.org/jira/browse/FLINK-11259 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.7.1 >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3 > > Time Spent: 20m > Remaining Estimate: 0h > > Bump Zookeeper to 3.4.13 > https://zookeeper.apache.org/doc/r3.4.13/releasenotes.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11250) fix thread leaked when StreamTask switched from DEPLOYING to CANCELING
[ https://issues.apache.org/jira/browse/FLINK-11250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-11250: Fix Version/s: (was: 1.7.2) 1.7.3 > fix thread leaked when StreamTask switched from DEPLOYING to CANCELING > -- > > Key: FLINK-11250 > URL: https://issues.apache.org/jira/browse/FLINK-11250 > Project: Flink > Issue Type: Bug > Components: Local Runtime, Streaming >Affects Versions: 1.5.6, 1.6.3, 1.7.1 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3 > > Time Spent: 0.5h > Remaining Estimate: 0h > > begin flink-1.5.x version, streamRecordWriters was created in StreamTask's > constructor, which start OutputFlusher daemon thread. so when task switched > from DEPLOYING to CANCELING state, the daemon thread will be leaked. > > *reproducible example* > {code:java} > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000); > env > .addSource(new SourceFunction() { > @Override > public void run(SourceContext ctx) throws Exception { > for (int i = 0; i < 1; i++) { > Thread.sleep(100); > ctx.collect("data " + i); > } > } > @Override > public void cancel() { > } > }) > .addSink(new RichSinkFunction() { > @Override > public void open(Configuration parameters) throws Exception { > System.out.println(1 / 0); > } > @Override > public void invoke(String value, Context context) throws > Exception { > } > }).setParallelism(2); > env.execute(); > }{code} > *some useful log* > {code:java} > 2019-01-02 03:03:47.525 [thread==> jobmanager-future-thread-2] > executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) > (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from CREATED to SCHEDULED. > 2019-01-02 03:03:47.526 [thread==> flink-akka.actor.default-dispatcher-5] > slotpool.SlotPool#allocateSlot:326 Received slot request > [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] for task: Attempt #1 > (Source: Custom Source (1/1)) @ (unassigned) - [SCHEDULED] > 2019-01-02 03:03:47.527 [thread==> flink-akka.actor.default-dispatcher-5] > slotpool.SlotSharingManager#createRootSlot:151 Create multi task slot > [SlotRequestId{494e47eb8318e2c0a1db91dda6b8}] in slot > [SlotRequestId{6d7f0173c1d48e5559f6a14b080ee817}]. > 2019-01-02 03:03:47.527 [thread==> flink-akka.actor.default-dispatcher-5] > slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create > single task slot [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] in multi > task slot [SlotRequestId{494e47eb8318e2c0a1db91dda6b8}] for group > bc764cd8ddf7a0cff126f51c16239658. > 2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] > slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create > single task slot [SlotRequestId{8a877431375df8aeadb2fd845cae15fc}] in multi > task slot [SlotRequestId{494e47eb8318e2c0a1db91dda6b8}] for group > 0a448493b4782967b150582570326227. > 2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] > slotpool.SlotSharingManager#createRootSlot:151 Create multi task slot > [SlotRequestId{56a36d3902ee1a7d0e2e84f50039c1ca}] in slot > [SlotRequestId{dbf5c9fa39f1e5a0b34a4a8c10699ee5}]. > 2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] > slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create > single task slot [SlotRequestId{5929c12b52dccee682f86afbe1cff5cf}] in multi > task slot [SlotRequestId{56a36d3902ee1a7d0e2e84f50039c1ca}] for group > 0a448493b4782967b150582570326227. > 2019-01-02 03:03:47.529 [thread==> flink-akka.actor.default-dispatcher-5] > executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) > (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from SCHEDULED to DEPLOYING. > 2019-01-02 03:03:47.529 [thread==> flink-akka.actor.default-dispatcher-5] > executiongraph.Execution#deploy:576 Deploying Source: Custom Source (1/1) > (attempt #1) to localhost > 2019-01-02 03:03:47.530 [thread==> flink-akka.actor.default-dispatcher-2] > state.TaskExecutorLocalStateStoresManager#localStateStoreForSubtask:162 > Registered new local state store with configuration >
[jira] [Updated] (FLINK-11116) Overwrite outdated in-progress files in StreamingFileSink.
[ https://issues.apache.org/jira/browse/FLINK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-6: Fix Version/s: (was: 1.7.2) 1.7.3 > Overwrite outdated in-progress files in StreamingFileSink. > -- > > Key: FLINK-6 > URL: https://issues.apache.org/jira/browse/FLINK-6 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3 > > Time Spent: 10m > Remaining Estimate: 0h > > In order to guarantee exactly-once semantics, the streaming file sink is > implementing a two-phase commit protocol when writing files to the filesystem. > Initially data is written to in-progress files. These files are then put into > "pending" state when they are completed (based on the rolling policy), and > they are finally committed when the checkpoint that put them in the "pending" > state is acknowledged as complete. > The above shows that in the case that we have: > 1) checkpoints A, B, C coming > 2) checkpoint A being acknowledged and > 3) failure > Then we may have files that do not belong to any checkpoint (because B and C > were not considered successful). These files are currently not cleaned up. > In order to reduce the amount of such files created, we removed the random > suffix from in-progress temporary files, so that the next in-progress file > that is opened for this part, overwrites them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11107) [state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured
[ https://issues.apache.org/jira/browse/FLINK-11107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-11107: Fix Version/s: (was: 1.7.2) 1.7.3 > [state] Avoid memory stateBackend to create arbitrary folders under HA path > when no checkpoint path configured > -- > > Key: FLINK-11107 > URL: https://issues.apache.org/jira/browse/FLINK-11107 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.2, 1.7.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.6.4, 1.7.3 > > > Currently, memory state-backend would create a folder named with random UUID > under HA directory if no checkpoint path ever configured. (the code logic > locates within {{StateBackendLoader#fromApplicationOrConfigOrDefault}}) > However, the default memory state-backend would not only be created on JM > side, but also on each task manager's side, which means many folders with > random UUID would be created under HA directory. It would result in exception > like: > {noformat} > The directory item limit of /tmp/flink/ha is exceeded: limit=1048576 > items=1048576{noformat} > If this happens, no new jobs could be submitted only if we clean up those > directories manually. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11061) Add travis profile that would run on each commit with scala 2.12
[ https://issues.apache.org/jira/browse/FLINK-11061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-11061: Fix Version/s: (was: 1.7.2) 1.7.3 > Add travis profile that would run on each commit with scala 2.12 > > > Key: FLINK-11061 > URL: https://issues.apache.org/jira/browse/FLINK-11061 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.7.0, 1.8.0 >Reporter: Dawid Wysakowicz >Priority: Major > Fix For: 1.7.3, 1.8.0 > > > In flink 1.7.0 we introduced support for scala 2.12 therefore we should add a > profile in travis that we check we do not break scala 2.12 compatibility. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11055) Allow Queryable State to be transformed on the TaskManager before being returned to the client
[ https://issues.apache.org/jira/browse/FLINK-11055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-11055: Fix Version/s: (was: 1.7.2) 1.7.3 > Allow Queryable State to be transformed on the TaskManager before being > returned to the client > -- > > Key: FLINK-11055 > URL: https://issues.apache.org/jira/browse/FLINK-11055 > Project: Flink > Issue Type: New Feature > Components: Queryable State >Reporter: Galen Warren >Priority: Major > Fix For: 1.7.3 > > > The proposal here is to enhance the way Queryable State works to allow for > the state object to be transformed on the TaskManager before being returned > to the client. As an example, if some MapState were made queryable, such > a transform might look up a specific key in the map and return its > corresponding value, resulting in only that value being returned to the > client instead of the entire map. This could be useful in cases where the > client only wants a portion of the state and the state is large (this is my > use case). > At a high level, I think this could be accomplished by adding an (optional) > serializable Function into KvStateRequest (and related > classes?) and having that transform be applied in the QueryableStateServer > (or QueryableStateClientProxy?). I expect some additional TypeInformation > would also have to be supplied/used in places. It should be doable in a > backwards compatible way such that if the client does not specify a transform > it works exactly as it does now. > Would there be any interested in a PR for this? This would help me for > something I'm currently working on and I'd be willing to take a crack at it. > If there is interest, I'll be happy to do some more research to come up with > a more concrete proposal. > Thanks for Flink - it's great! > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10954) Hardlink from files of previous local stored state might cross devices
[ https://issues.apache.org/jira/browse/FLINK-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-10954: Fix Version/s: (was: 1.7.2) 1.7.3 > Hardlink from files of previous local stored state might cross devices > -- > > Key: FLINK-10954 > URL: https://issues.apache.org/jira/browse/FLINK-10954 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.2 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.6.4, 1.7.3, 1.8.0 > > > Currently, local recovery's base directories is initialized from > '{{io.tmp.dirs}}' if parameter '{{taskmanager.state.local.root-dirs}}' is not > set. For Yarn environment, the tmp dirs is replaced by its '{{LOCAL_DIRS}}', > which might consist of directories from different devices, such as > /dump/1/nm-local-dir, /dump/2/nm-local-dir. The local directory for RocksDB > is initialized from IOManager's spillingDirectories, which might located in > different device from local recovery's folder. However, hard-link between > different devices is not allowed, it will throw exception below: > {code:java} > java.nio.file.FileSystemException: target -> souce: Invalid cross-device link > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r255512747 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java ## @@ -111,7 +112,8 @@ public void testScheduleImmediately() throws Exception { assertEquals(5, testingSlotProvider.getNumberOfAvailableSlots()); } - + + @Ignore Review comment: Yes, it could still make sense to test, even without the concurrency aspect. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r255510916 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java ## @@ -109,16 +111,15 @@ public void testPayloadAssignmentAfterRelease() { assertThat(singleLogicalSlot.tryAssignPayload(dummyPayload), is(false)); } + @Ignore Review comment: I think it can be removed because it makes no longer sense to test because `returnLogicalSlot` is no longer returning a future. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r255508677 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds - failure report comes +* from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + 32 * 1024); + } + + /** +* Non-spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength, + isA(IndexOutOfBoundsException.class)); + } + + /** +* Spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength - 1, + isA(EOFException.class)); + } + + /** +* Spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> 1, + isA(EOFException.class)); + } + + /** +* Spanning, spilling, deserialization reads one byte too many. +*/ + @Test + public void testHandleDeserializingTooMuchSpanningLargeRecord() throws Exception { + testHandleWrongDeserialization( + LargeObjectTypeDeserializingTooMuch.getRandom(), + 32 * 1024, + isA(EOFException.class)); + } + + /** +* Non-spanning, deserialization forgets to read one byte - failure report comes from an +* additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughNonSpanning() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + 32 * 1024); + } + + /** +* Spanning, deserialization forgets to read one byte - failure report comes from an additional +* check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + (serializedLength) -> serializedLength - 1); + } + + /** +* Spanning, serialization length is 17 (including headers), deserialization forgets to read one +* byte - failure report comes from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + 1); + } + + /** +* Spanning, spilling, deserialization forgets to read one byte - failure report comes from an +* additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanningLargeRecord() throws Exception { + testHandleWrongDeserialization( + LargeObjectTypeDeserializingNotEnough.getRandom(), + 32 * 1024); + } + + private void testHandleWrongDeserialization( + WrongDeserializationValue testValue, + IntFunction segmentSizeProvider, + Matcher expectedCause) throws Exception { + expectedException.expectCause(expectedCause); + testHandleWrongDeserialization(testValue,
[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r255505919 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java ## @@ -97,34 +94,30 @@ private TestingResourceManagerGateway resourceManagerGateway; + private ComponentMainThreadExecutor mainThreadExecutor = + TestingComponentMainThreadExecutorServiceAdapter.forMainThread(); Review comment: It was already not concurrent anymore, so yes we can simplify. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r255502651 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java ## @@ -69,33 +62,18 @@ import static org.junit.Assert.fail; /** - * Tests for the SlotPool using a proper RPC setup. + * Tests for the SlotPoolImpl interactions. */ -public class SlotPoolRpcTest extends TestLogger { - - private static RpcService rpcService; - - private static final Time timeout = Time.seconds(10L); +public class SlotPoolInteractionsTest extends TestLogger { Review comment: I think it still somewhat required because the test involve timeouts, and the simple direct main thread executor will always run into a violation if it is entered through the scheduled executor of the timeout. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r252367679 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ## @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws Exception { testSerializationRoundTrip(originalRecords, segmentSize); } + /** +* Non-spanning, deserialization reads one byte too many and succeeds - failure report comes +* from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + 32 * 1024); + } + + /** +* Non-spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchNonSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength, + isA(IndexOutOfBoundsException.class)); + } + + /** +* Spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> serializedLength - 1, + isA(EOFException.class)); + } + + /** +* Spanning, deserialization reads one byte too many and fails. +*/ + @Test + public void testHandleDeserializingTooMuchSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingTooMuch.getValue(), + (serializedLength) -> 1, + isA(EOFException.class)); + } + + /** +* Spanning, spilling, deserialization reads one byte too many. +*/ + @Test + public void testHandleDeserializingTooMuchSpanningLargeRecord() throws Exception { + testHandleWrongDeserialization( + LargeObjectTypeDeserializingTooMuch.getRandom(), + 32 * 1024, + isA(EOFException.class)); + } + + /** +* Non-spanning, deserialization forgets to read one byte - failure report comes from an +* additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughNonSpanning() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + 32 * 1024); + } + + /** +* Spanning, deserialization forgets to read one byte - failure report comes from an additional +* check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanning1() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + (serializedLength) -> serializedLength - 1); + } + + /** +* Spanning, serialization length is 17 (including headers), deserialization forgets to read one +* byte - failure report comes from an additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanning2() throws Exception { + testHandleWrongDeserialization( + DeserializingNotEnough.getValue(), + 1); + } + + /** +* Spanning, spilling, deserialization forgets to read one byte - failure report comes from an +* additional check in {@link SpillingAdaptiveSpanningRecordDeserializer}. +*/ + @Test + public void testHandleDeserializingNotEnoughSpanningLargeRecord() throws Exception { + testHandleWrongDeserialization( + LargeObjectTypeDeserializingNotEnough.getRandom(), + 32 * 1024); + } + + private void testHandleWrongDeserialization( + WrongDeserializationValue testValue, + IntFunction segmentSizeProvider, + Matcher expectedCause) throws Exception { + expectedException.expectCause(expectedCause); + testHandleWrongDeserialization(testValue,
[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r255498671 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java ## @@ -19,54 +19,55 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; -import org.apache.flink.runtime.rpc.RpcService; import org.junit.rules.ExternalResource; import javax.annotation.Nonnull; +import java.util.HashMap; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; /** - * {@link ExternalResource} which provides a {@link SlotPool}. + * {@link ExternalResource} which provides a {@link SlotPoolImpl}. */ public class SlotPoolResource extends ExternalResource { Review comment: We could, but it is still rather convenient to have it in tests. I would not drop it only for that purpose because it still feels like it saves you code lines in testing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11529) Support Chinese Documents for Apache Flink
[ https://issues.apache.org/jira/browse/FLINK-11529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-11529: --- Assignee: Jark Wu > Support Chinese Documents for Apache Flink > -- > > Key: FLINK-11529 > URL: https://issues.apache.org/jira/browse/FLINK-11529 > Project: Flink > Issue Type: New Feature > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > > This issue is an umbrella issue for tracking fully support Chinese for Flink > documents (http://ci.apache.org/projects/flink/flink-docs-master/). > A more detailed description can be found in the proposal doc: > https://docs.google.com/document/d/1R1-uDq-KawLB8afQYrczfcoQHjjIhq6tvUksxrfhBl0/edit# -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-4330) Consider removing min()/minBy()/max()/maxBy()/sum() utility methods from the DataStream API
[ https://issues.apache.org/jira/browse/FLINK-4330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Valeria Vasylieva updated FLINK-4330: - Attachment: (was: 1.url) > Consider removing min()/minBy()/max()/maxBy()/sum() utility methods from the > DataStream API > --- > > Key: FLINK-4330 > URL: https://issues.apache.org/jira/browse/FLINK-4330 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Robert Metzger >Priority: Major > Fix For: 2.0.0 > > > I think we should consider removing the min()/minBy()/max()/maxBy()/sum() > utility methods from the DataStream API. They make the maintenance of the > code unnecessary complicated, and don't add enough value for the users (who > can not access the window metadata). > If we are keeping the methods, we should consolidate the min/minBy methods: > the difference is subtle, and minBy can subsume the min method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-4330) Consider removing min()/minBy()/max()/maxBy()/sum() utility methods from the DataStream API
[ https://issues.apache.org/jira/browse/FLINK-4330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Valeria Vasylieva updated FLINK-4330: - Attachment: 1.url > Consider removing min()/minBy()/max()/maxBy()/sum() utility methods from the > DataStream API > --- > > Key: FLINK-4330 > URL: https://issues.apache.org/jira/browse/FLINK-4330 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Robert Metzger >Priority: Major > Fix For: 2.0.0 > > > I think we should consider removing the min()/minBy()/max()/maxBy()/sum() > utility methods from the DataStream API. They make the maintenance of the > code unnecessary complicated, and don't add enough value for the users (who > can not access the window metadata). > If we are keeping the methods, we should consolidate the min/minBy methods: > the difference is subtle, and minBy can subsume the min method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased) URL: https://github.com/apache/flink/pull/7662#discussion_r255495062 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ## @@ -66,11 +74,13 @@ public void testExecutionGraphRestartTimeMetric() throws JobException, IOExcepti jobVertex.setInvokableClass(NoOpInvokable.class); JobGraph jobGraph = new JobGraph("Test Job", jobVertex); + Scheduler scheduler = mock(Scheduler.class); Review comment: That implementation is already there and used, so this line can just be deleted. It is leftover from the rebase.t. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services