[GitHub] Guibo-Pan commented on issue #6773: Add reverse function in Table API and SQL
Guibo-Pan commented on issue #6773: Add reverse function in Table API and SQL URL: https://github.com/apache/flink/pull/6773#issuecomment-425318708 cc @xccui @yanghua This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10340) Add Cosh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631324#comment-16631324 ] ASF GitHub Bot commented on FLINK-10340: xccui commented on a change in pull request #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6700#discussion_r221136490 ## File path: docs/dev/table/functions.md ## @@ -1274,6 +1274,17 @@ ATAN2(numeric1, numeric2) + + +{% highlight text %} +COSH(numeric) +{% endhighlight %} + + +Returns the hyperbolic cosine of numeric. Return value type is DOUBLE. Review comment: 1. This page is not a pure markdown doc. We cannot use backquote here. 2. Conventionally, we use small letters for SQL attributes and capital letters for Java/Scala attributes. 3. I don't think the return type should be marked like that. In most cases, it should be identical to the inputs' types (e.g., if the input is a `bigdecimal`, we should also return a `begdecimal` instead of a fixed `double`). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Cosh math function supported in Table API and SQL > - > > Key: FLINK-10340 > URL: https://issues.apache.org/jira/browse/FLINK-10340 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Sergey Tsvetkov >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > Implement udf of cosh, just like in oracle > [https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions031.htm#SQLRF00623] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL
xccui commented on a change in pull request #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6700#discussion_r221136490 ## File path: docs/dev/table/functions.md ## @@ -1274,6 +1274,17 @@ ATAN2(numeric1, numeric2) + + +{% highlight text %} +COSH(numeric) +{% endhighlight %} + + +Returns the hyperbolic cosine of numeric. Return value type is DOUBLE. Review comment: 1. This page is not a pure markdown doc. We cannot use backquote here. 2. Conventionally, we use small letters for SQL attributes and capital letters for Java/Scala attributes. 3. I don't think the return type should be marked like that. In most cases, it should be identical to the inputs' types (e.g., if the input is a `bigdecimal`, we should also return a `begdecimal` instead of a fixed `double`). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10340) Add Cosh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631322#comment-16631322 ] ASF GitHub Bot commented on FLINK-10340: xccui commented on a change in pull request #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6700#discussion_r221136490 ## File path: docs/dev/table/functions.md ## @@ -1274,6 +1274,17 @@ ATAN2(numeric1, numeric2) + + +{% highlight text %} +COSH(numeric) +{% endhighlight %} + + +Returns the hyperbolic cosine of numeric. Return value type is DOUBLE. Review comment: 1. This page is not a pure markdown doc. We cannot use backquote here. 2. Conventionally, we use small letters for SQL attributes and capital letters for Java/Scala attributes. 3. I don't think the return type should be marked like that. In most cases, it should be identical with the input's types. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Cosh math function supported in Table API and SQL > - > > Key: FLINK-10340 > URL: https://issues.apache.org/jira/browse/FLINK-10340 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Sergey Tsvetkov >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > Implement udf of cosh, just like in oracle > [https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions031.htm#SQLRF00623] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL
xccui commented on a change in pull request #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6700#discussion_r221136490 ## File path: docs/dev/table/functions.md ## @@ -1274,6 +1274,17 @@ ATAN2(numeric1, numeric2) + + +{% highlight text %} +COSH(numeric) +{% endhighlight %} + + +Returns the hyperbolic cosine of numeric. Return value type is DOUBLE. Review comment: 1. This page is not a pure markdown doc. We cannot use backquote here. 2. Conventionally, we use small letters for SQL attributes and capital letters for Java/Scala attributes. 3. I don't think the return type should be marked like that. In most cases, it should be identical with the input's types. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Guibo-Pan opened a new pull request #6773: Add reverse function in Table API and SQL
Guibo-Pan opened a new pull request #6773: Add reverse function in Table API and SQL URL: https://github.com/apache/flink/pull/6773 ## What is the purpose of the change *This pull request add replace function in TableAPI and SQL* ## Brief change log - *Add reverse function in TableAPI and SQL* ## Verifying this change This change is already covered by existing tests, such as *ScalarFunctionsTest#testReverse*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10456) Remove org.apache.flink.api.common.time.Deadline
tison created FLINK-10456: - Summary: Remove org.apache.flink.api.common.time.Deadline Key: FLINK-10456 URL: https://issues.apache.org/jira/browse/FLINK-10456 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.7.0 Reporter: tison Assignee: tison Fix For: 1.7.0 We already have {{scala.concurrent.duration.Deadline}}. {{org.apache.flink.api.common.time.Deadline}} is not a rich extend of it. I suspect at which situation we need a customized Deadline. If not, introduce a weak alternation seems unreasonable and raise confusion. What do you think? cc [~StephanEwen] [~Zentol] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10420) Create and drop view in sql client should check the view created based on the configuration.
[ https://issues.apache.org/jira/browse/FLINK-10420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631287#comment-16631287 ] vinoyang commented on FLINK-10420: -- hi [~twalthr] I think the *CREATE OR REPLACE VIEW* syntax is a good solution to this problem. The user decides and understands his choice. What do you think? In addition, I have another question, why do we use calcite to parse statements when sql-client uses DDL statements? > Create and drop view in sql client should check the view created based on the > configuration. > > > Key: FLINK-10420 > URL: https://issues.apache.org/jira/browse/FLINK-10420 > Project: Flink > Issue Type: Bug > Components: SQL Client >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > > Currently, just checked current session : > {code:java} > private void callCreateView(SqlCommandCall cmdCall) { >final String name = cmdCall.operands[0]; >final String query = cmdCall.operands[1]; >//here >final String previousQuery = context.getViews().get(name); >if (previousQuery != null) { > printExecutionError(CliStrings.MESSAGE_VIEW_ALREADY_EXISTS); > return; >} > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10240) Pluggable scheduling strategy for batch jobs
[ https://issues.apache.org/jira/browse/FLINK-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-10240: Affects Version/s: 1.7.0 > Pluggable scheduling strategy for batch jobs > > > Key: FLINK-10240 > URL: https://issues.apache.org/jira/browse/FLINK-10240 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Priority: Major > Labels: scheduling > > Currently batch jobs are scheduled with LAZY_FROM_SOURCES strategy: source > tasks are scheduled in the beginning, and other tasks are scheduled once > there input data are consumable. > However, input data consumable does not always mean the task can work at > once. > > One example is the hash join operation, where the operator first consumes one > side(we call it build side) to setup a table, then consumes the other side(we > call it probe side) to do the real join work. If the probe side is started > early, it just get stuck on back pressure as the join operator will not > consume data from it before the building stage is done, causing a waste of > resources. > If we have the probe side task started after the build stage is done, both > the build and probe side can have more computing resources as they are > staggered. > > That's why we think a flexible scheduling strategy is needed, allowing job > owners to customize the vertex schedule order and constraints. Better > resource utilization usually means better performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-4399) Add support for oversized messages
[ https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631275#comment-16631275 ] Leanken.Lin edited comment on FLINK-4399 at 9/28/18 2:26 AM: - Hi. [~till.rohrmann] Do we have some update on this issue? Is anyone still working on this issue. I've see another PR [https://github.com/apache/flink/pull/887] related to this JIRA, but not merge yet. was (Author: leanken): Hi. [~till.rohrmann] Do we have some update on this issue? > Add support for oversized messages > -- > > Key: FLINK-4399 > URL: https://issues.apache.org/jira/browse/FLINK-4399 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination > Environment: FLIP-6 feature branch >Reporter: Stephan Ewen >Priority: Major > Labels: flip-6 > Fix For: 1.7.0 > > > Currently, messages larger than the maximum Akka Framesize cause an error > when being transported. We should add a way to pass messages that are larger > than the Framesize, as may happen for: > - {{collect()}} calls that collect large data sets (via accumulators) > - Job submissions and operator deployments where the functions closures are > large (for example because it contains large pre-loaded data) > - Function restore in cases where restored state is larger than > checkpointed state (union state) > I suggest to use the {{BlobManager}} to transfer large payload. > - On the sender side, oversized messages are stored under a transient blob > (which is deleted after first retrieval, or after a certain number of minutes) > - The sender sends a "pointer to blob message" instead. > - The receiver grabs the message from the blob upon receiving the pointer > message > The RPC Service should be optionally initializable with a "large message > handler" which is internally the {{BlobManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10398) Add Tanh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631278#comment-16631278 ] ASF GitHub Bot commented on FLINK-10398: yanghua commented on issue #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6736#issuecomment-425300319 @pnowojski I also changed this PR based on your suggestion for the `cosh` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Tanh math function supported in Table API and SQL > - > > Key: FLINK-10398 > URL: https://issues.apache.org/jira/browse/FLINK-10398 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to : https://www.techonthenet.com/oracle/functions/tanh.php -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL
yanghua commented on issue #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6736#issuecomment-425300319 @pnowojski I also changed this PR based on your suggestion for the `cosh` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-4399) Add support for oversized messages
[ https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631275#comment-16631275 ] Leanken.Lin edited comment on FLINK-4399 at 9/28/18 2:21 AM: - Hi. [~till.rohrmann] Do we have some update on this issue? was (Author: leanken): Hi. [~till.rohrmann] > Add support for oversized messages > -- > > Key: FLINK-4399 > URL: https://issues.apache.org/jira/browse/FLINK-4399 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination > Environment: FLIP-6 feature branch >Reporter: Stephan Ewen >Priority: Major > Labels: flip-6 > Fix For: 1.7.0 > > > Currently, messages larger than the maximum Akka Framesize cause an error > when being transported. We should add a way to pass messages that are larger > than the Framesize, as may happen for: > - {{collect()}} calls that collect large data sets (via accumulators) > - Job submissions and operator deployments where the functions closures are > large (for example because it contains large pre-loaded data) > - Function restore in cases where restored state is larger than > checkpointed state (union state) > I suggest to use the {{BlobManager}} to transfer large payload. > - On the sender side, oversized messages are stored under a transient blob > (which is deleted after first retrieval, or after a certain number of minutes) > - The sender sends a "pointer to blob message" instead. > - The receiver grabs the message from the blob upon receiving the pointer > message > The RPC Service should be optionally initializable with a "large message > handler" which is internally the {{BlobManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4399) Add support for oversized messages
[ https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631275#comment-16631275 ] Leanken.Lin commented on FLINK-4399: Hi. [~till.rohrmann] > Add support for oversized messages > -- > > Key: FLINK-4399 > URL: https://issues.apache.org/jira/browse/FLINK-4399 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination > Environment: FLIP-6 feature branch >Reporter: Stephan Ewen >Priority: Major > Labels: flip-6 > Fix For: 1.7.0 > > > Currently, messages larger than the maximum Akka Framesize cause an error > when being transported. We should add a way to pass messages that are larger > than the Framesize, as may happen for: > - {{collect()}} calls that collect large data sets (via accumulators) > - Job submissions and operator deployments where the functions closures are > large (for example because it contains large pre-loaded data) > - Function restore in cases where restored state is larger than > checkpointed state (union state) > I suggest to use the {{BlobManager}} to transfer large payload. > - On the sender side, oversized messages are stored under a transient blob > (which is deleted after first retrieval, or after a certain number of minutes) > - The sender sends a "pointer to blob message" instead. > - The receiver grabs the message from the blob upon receiving the pointer > message > The RPC Service should be optionally initializable with a "large message > handler" which is internally the {{BlobManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool
[ https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631252#comment-16631252 ] ASF GitHub Bot commented on FLINK-10247: Clark commented on issue #6759: [FLINK-10247][Metrics] Run MetricQueryService in a dedicated actor system URL: https://github.com/apache/flink/pull/6759#issuecomment-425296165 cc @TisonKun @yanghua This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Run MetricQueryService in separate thread pool > -- > > Key: FLINK-10247 > URL: https://issues.apache.org/jira/browse/FLINK-10247 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > In order to make the {{MetricQueryService}} run independently of the main > Flink components, it should get its own dedicated thread pool assigned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Clarkkkkk commented on issue #6759: [FLINK-10247][Metrics] Run MetricQueryService in a dedicated actor system
Clark commented on issue #6759: [FLINK-10247][Metrics] Run MetricQueryService in a dedicated actor system URL: https://github.com/apache/flink/pull/6759#issuecomment-425296165 cc @TisonKun @yanghua This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()
[ https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631165#comment-16631165 ] Bowen Li commented on FLINK-10168: -- [~wind_ljy] are you planning to start working on this issue very soon? If not, I plan to take it over > support filtering files by modified/created time in > StreamExecutionEnvironment.readFile() > - > > Key: FLINK-10168 > URL: https://issues.apache.org/jira/browse/FLINK-10168 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Jiayi Liao >Priority: Major > Fix For: 1.7.0 > > > support filtering files by modified/created time in > {{StreamExecutionEnvironment.readFile()}} > for example, in a source dir with lots of file, we only want to read files > that is created or modified after a specific time. > This API can expose a generic filter function of files, and let users define > filtering rules. Currently Flink only supports filtering files by path. What > this means is that, currently the API is > {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path > filter. A more generic API that can take more filters can look like this 1) > {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... > ))}} > 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} > exposes all file attributes that Flink's file system can provide, like path > and modified time > I lean towards the 2nd option, because it gives users more flexibility to > define complex filtering rules based on combinations of file attributes. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy
[ https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631143#comment-16631143 ] ASF GitHub Bot commented on FLINK-10289: isunjin commented on issue #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#issuecomment-425266762 @StephanEwen , i have changed the implementation with java annotation, could you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Classify Exceptions to different category for apply different failover > strategy > --- > > Key: FLINK-10289 > URL: https://issues.apache.org/jira/browse/FLINK-10289 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > We need to classify exceptions and treat them with different strategies. To > do this, we propose to introduce the following Throwable Types, and the > corresponding exceptions: > * NonRecoverable > ** We shouldn’t retry if an exception was classified as NonRecoverable > ** For example, NoResouceAvailiableException is a NonRecoverable Exception > ** Introduce a new Exception UserCodeException to wrap all exceptions that > throw from user code > * PartitionDataMissingError > ** In certain scenarios producer data was transferred in blocking mode or > data was saved in persistent store. If the partition was missing, we need to > revoke/rerun the produce task to regenerate the data. > ** Introduce a new exception PartitionDataMissingException to wrap all those > kinds of issues. > * EnvironmentError > ** It happened due to hardware, or software issues that were related to > specific environments. The assumption is that a task will succeed if we run > it in a different environment, and other task run in this bad environment > will very likely fail. If multiple task failures in the same machine due to > EnvironmentError, we need to consider adding the bad machine to blacklist, > and avoiding schedule task on it. > ** Introduce a new exception EnvironmentException to wrap all those kind of > issues. > * Recoverable > ** We assume other issues are recoverable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] isunjin commented on issue #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy
isunjin commented on issue #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#issuecomment-425266762 @StephanEwen , i have changed the implementation with java annotation, could you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy
[ https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631036#comment-16631036 ] ASF GitHub Bot commented on FLINK-10289: yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#discussion_r221074188 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.throwable; + +import org.apache.flink.runtime.execution.SuppressRestartsException; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * given a exception do the classification. + */ +public class ThrowableClassifier { Review comment: i think we can explore interface/annotation approach This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Classify Exceptions to different category for apply different failover > strategy > --- > > Key: FLINK-10289 > URL: https://issues.apache.org/jira/browse/FLINK-10289 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > We need to classify exceptions and treat them with different strategies. To > do this, we propose to introduce the following Throwable Types, and the > corresponding exceptions: > * NonRecoverable > ** We shouldn’t retry if an exception was classified as NonRecoverable > ** For example, NoResouceAvailiableException is a NonRecoverable Exception > ** Introduce a new Exception UserCodeException to wrap all exceptions that > throw from user code > * PartitionDataMissingError > ** In certain scenarios producer data was transferred in blocking mode or > data was saved in persistent store. If the partition was missing, we need to > revoke/rerun the produce task to regenerate the data. > ** Introduce a new exception PartitionDataMissingException to wrap all those > kinds of issues. > * EnvironmentError > ** It happened due to hardware, or software issues that were related to > specific environments. The assumption is that a task will succeed if we run > it in a different environment, and other task run in this bad environment > will very likely fail. If multiple task failures in the same machine due to > EnvironmentError, we need to consider adding the bad machine to blacklist, > and avoiding schedule task on it. > ** Introduce a new exception EnvironmentException to wrap all those kind of > issues. > * Recoverable > ** We assume other issues are recoverable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy
yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#discussion_r221074188 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.throwable; + +import org.apache.flink.runtime.execution.SuppressRestartsException; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * given a exception do the classification. + */ +public class ThrowableClassifier { Review comment: i think we can explore interface/annotation approach This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy
[ https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631031#comment-16631031 ] ASF GitHub Bot commented on FLINK-10289: yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#discussion_r221073415 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.throwable; + +/** + * */ +public enum ThrowableType { + + /** +* the issue indicate it will not success even retry, such as a DivideZeroException. +* for this kind of exception, we shouldn't consider failover, or we should fail the job +*/ + NonRecoverable, + + /** +* indicate data consumption error, that we should revoke the producer. Review comment: data consumption error, which indicates that ... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Classify Exceptions to different category for apply different failover > strategy > --- > > Key: FLINK-10289 > URL: https://issues.apache.org/jira/browse/FLINK-10289 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > We need to classify exceptions and treat them with different strategies. To > do this, we propose to introduce the following Throwable Types, and the > corresponding exceptions: > * NonRecoverable > ** We shouldn’t retry if an exception was classified as NonRecoverable > ** For example, NoResouceAvailiableException is a NonRecoverable Exception > ** Introduce a new Exception UserCodeException to wrap all exceptions that > throw from user code > * PartitionDataMissingError > ** In certain scenarios producer data was transferred in blocking mode or > data was saved in persistent store. If the partition was missing, we need to > revoke/rerun the produce task to regenerate the data. > ** Introduce a new exception PartitionDataMissingException to wrap all those > kinds of issues. > * EnvironmentError > ** It happened due to hardware, or software issues that were related to > specific environments. The assumption is that a task will succeed if we run > it in a different environment, and other task run in this bad environment > will very likely fail. If multiple task failures in the same machine due to > EnvironmentError, we need to consider adding the bad machine to blacklist, > and avoiding schedule task on it. > ** Introduce a new exception EnvironmentException to wrap all those kind of > issues. > * Recoverable > ** We assume other issues are recoverable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy
[ https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631032#comment-16631032 ] ASF GitHub Bot commented on FLINK-10289: yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#discussion_r221073431 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.throwable; + +/** + * */ +public enum ThrowableType { + + /** +* the issue indicate it will not success even retry, such as a DivideZeroException. +* for this kind of exception, we shouldn't consider failover, or we should fail the job +*/ + NonRecoverable, + + /** +* indicate data consumption error, that we should revoke the producer. +* */ + PartitionDataMissingError, + + /** +* this indicate errors such us Hardware error, service issue, that we should consider blacklist the machine. Review comment: this indicates error related to running environment, such as ..., in which case we should consider... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Classify Exceptions to different category for apply different failover > strategy > --- > > Key: FLINK-10289 > URL: https://issues.apache.org/jira/browse/FLINK-10289 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > We need to classify exceptions and treat them with different strategies. To > do this, we propose to introduce the following Throwable Types, and the > corresponding exceptions: > * NonRecoverable > ** We shouldn’t retry if an exception was classified as NonRecoverable > ** For example, NoResouceAvailiableException is a NonRecoverable Exception > ** Introduce a new Exception UserCodeException to wrap all exceptions that > throw from user code > * PartitionDataMissingError > ** In certain scenarios producer data was transferred in blocking mode or > data was saved in persistent store. If the partition was missing, we need to > revoke/rerun the produce task to regenerate the data. > ** Introduce a new exception PartitionDataMissingException to wrap all those > kinds of issues. > * EnvironmentError > ** It happened due to hardware, or software issues that were related to > specific environments. The assumption is that a task will succeed if we run > it in a different environment, and other task run in this bad environment > will very likely fail. If multiple task failures in the same machine due to > EnvironmentError, we need to consider adding the bad machine to blacklist, > and avoiding schedule task on it. > ** Introduce a new exception EnvironmentException to wrap all those kind of > issues. > * Recoverable > ** We assume other issues are recoverable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy
[ https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631033#comment-16631033 ] ASF GitHub Bot commented on FLINK-10289: yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#discussion_r221073448 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.throwable; + +/** + * */ +public enum ThrowableType { + + /** +* the issue indicate it will not success even retry, such as a DivideZeroException. +* for this kind of exception, we shouldn't consider failover, or we should fail the job +*/ + NonRecoverable, + + /** +* indicate data consumption error, that we should revoke the producer. +* */ + PartitionDataMissingError, + + /** +* this indicate errors such us Hardware error, service issue, that we should consider blacklist the machine. +* */ + EnvironmentError, + + /** +* this indicate other errors that recoverable. Review comment: this indicates other error that is recoverable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Classify Exceptions to different category for apply different failover > strategy > --- > > Key: FLINK-10289 > URL: https://issues.apache.org/jira/browse/FLINK-10289 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > We need to classify exceptions and treat them with different strategies. To > do this, we propose to introduce the following Throwable Types, and the > corresponding exceptions: > * NonRecoverable > ** We shouldn’t retry if an exception was classified as NonRecoverable > ** For example, NoResouceAvailiableException is a NonRecoverable Exception > ** Introduce a new Exception UserCodeException to wrap all exceptions that > throw from user code > * PartitionDataMissingError > ** In certain scenarios producer data was transferred in blocking mode or > data was saved in persistent store. If the partition was missing, we need to > revoke/rerun the produce task to regenerate the data. > ** Introduce a new exception PartitionDataMissingException to wrap all those > kinds of issues. > * EnvironmentError > ** It happened due to hardware, or software issues that were related to > specific environments. The assumption is that a task will succeed if we run > it in a different environment, and other task run in this bad environment > will very likely fail. If multiple task failures in the same machine due to > EnvironmentError, we need to consider adding the bad machine to blacklist, > and avoiding schedule task on it. > ** Introduce a new exception EnvironmentException to wrap all those kind of > issues. > * Recoverable > ** We assume other issues are recoverable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy
[ https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631030#comment-16631030 ] ASF GitHub Bot commented on FLINK-10289: yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#discussion_r221073399 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.throwable; + +/** + * */ +public enum ThrowableType { + + /** +* the issue indicate it will not success even retry, such as a DivideZeroException. +* for this kind of exception, we shouldn't consider failover, or we should fail the job +*/ + NonRecoverable, Review comment: NonRecoverableError to be more consistent with other types This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Classify Exceptions to different category for apply different failover > strategy > --- > > Key: FLINK-10289 > URL: https://issues.apache.org/jira/browse/FLINK-10289 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > We need to classify exceptions and treat them with different strategies. To > do this, we propose to introduce the following Throwable Types, and the > corresponding exceptions: > * NonRecoverable > ** We shouldn’t retry if an exception was classified as NonRecoverable > ** For example, NoResouceAvailiableException is a NonRecoverable Exception > ** Introduce a new Exception UserCodeException to wrap all exceptions that > throw from user code > * PartitionDataMissingError > ** In certain scenarios producer data was transferred in blocking mode or > data was saved in persistent store. If the partition was missing, we need to > revoke/rerun the produce task to regenerate the data. > ** Introduce a new exception PartitionDataMissingException to wrap all those > kinds of issues. > * EnvironmentError > ** It happened due to hardware, or software issues that were related to > specific environments. The assumption is that a task will succeed if we run > it in a different environment, and other task run in this bad environment > will very likely fail. If multiple task failures in the same machine due to > EnvironmentError, we need to consider adding the bad machine to blacklist, > and avoiding schedule task on it. > ** Introduce a new exception EnvironmentException to wrap all those kind of > issues. > * Recoverable > ** We assume other issues are recoverable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy
yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#discussion_r221073399 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.throwable; + +/** + * */ +public enum ThrowableType { + + /** +* the issue indicate it will not success even retry, such as a DivideZeroException. +* for this kind of exception, we shouldn't consider failover, or we should fail the job +*/ + NonRecoverable, Review comment: NonRecoverableError to be more consistent with other types This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy
yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#discussion_r221073448 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.throwable; + +/** + * */ +public enum ThrowableType { + + /** +* the issue indicate it will not success even retry, such as a DivideZeroException. +* for this kind of exception, we shouldn't consider failover, or we should fail the job +*/ + NonRecoverable, + + /** +* indicate data consumption error, that we should revoke the producer. +* */ + PartitionDataMissingError, + + /** +* this indicate errors such us Hardware error, service issue, that we should consider blacklist the machine. +* */ + EnvironmentError, + + /** +* this indicate other errors that recoverable. Review comment: this indicates other error that is recoverable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy
yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#discussion_r221073415 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.throwable; + +/** + * */ +public enum ThrowableType { + + /** +* the issue indicate it will not success even retry, such as a DivideZeroException. +* for this kind of exception, we shouldn't consider failover, or we should fail the job +*/ + NonRecoverable, + + /** +* indicate data consumption error, that we should revoke the producer. Review comment: data consumption error, which indicates that ... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy
yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#discussion_r221073384 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.throwable; + +/** + * */ +public enum ThrowableType { + + /** +* the issue indicate it will not success even retry, such as a DivideZeroException. Review comment: this indicates error that would not succeed even with retry, such as DivideZeroExeception. No failover should be attempted with such error. Instead, the job should fail immediately. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy
[ https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631029#comment-16631029 ] ASF GitHub Bot commented on FLINK-10289: yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#discussion_r221073384 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.throwable; + +/** + * */ +public enum ThrowableType { + + /** +* the issue indicate it will not success even retry, such as a DivideZeroException. Review comment: this indicates error that would not succeed even with retry, such as DivideZeroExeception. No failover should be attempted with such error. Instead, the job should fail immediately. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Classify Exceptions to different category for apply different failover > strategy > --- > > Key: FLINK-10289 > URL: https://issues.apache.org/jira/browse/FLINK-10289 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > We need to classify exceptions and treat them with different strategies. To > do this, we propose to introduce the following Throwable Types, and the > corresponding exceptions: > * NonRecoverable > ** We shouldn’t retry if an exception was classified as NonRecoverable > ** For example, NoResouceAvailiableException is a NonRecoverable Exception > ** Introduce a new Exception UserCodeException to wrap all exceptions that > throw from user code > * PartitionDataMissingError > ** In certain scenarios producer data was transferred in blocking mode or > data was saved in persistent store. If the partition was missing, we need to > revoke/rerun the produce task to regenerate the data. > ** Introduce a new exception PartitionDataMissingException to wrap all those > kinds of issues. > * EnvironmentError > ** It happened due to hardware, or software issues that were related to > specific environments. The assumption is that a task will succeed if we run > it in a different environment, and other task run in this bad environment > will very likely fail. If multiple task failures in the same machine due to > EnvironmentError, we need to consider adding the bad machine to blacklist, > and avoiding schedule task on it. > ** Introduce a new exception EnvironmentException to wrap all those kind of > issues. > * Recoverable > ** We assume other issues are recoverable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy
yingdachen commented on a change in pull request #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#discussion_r221073431 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableType.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.throwable; + +/** + * */ +public enum ThrowableType { + + /** +* the issue indicate it will not success even retry, such as a DivideZeroException. +* for this kind of exception, we shouldn't consider failover, or we should fail the job +*/ + NonRecoverable, + + /** +* indicate data consumption error, that we should revoke the producer. +* */ + PartitionDataMissingError, + + /** +* this indicate errors such us Hardware error, service issue, that we should consider blacklist the machine. Review comment: this indicates error related to running environment, such as ..., in which case we should consider... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630888#comment-16630888 ] ASF GitHub Bot commented on FLINK-10205: xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r221036186 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() { return locationConstraint; } + public InputSplit getNextInputSplit(int index, String host) { + final int taskId = this.getParallelSubtaskIndex(); + synchronized (this.inputSplits) { + if (index < this.inputSplits.size()) { + return this.inputSplits.get(index); + } else { + final InputSplit nextInputSplit = this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId); Review comment: Ok, I think there shouldn't be too much re-computation overhead, since it happens once per attempt. I am fine with either way. Thx. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.6.1, 1.6.2 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…
xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r221036186 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() { return locationConstraint; } + public InputSplit getNextInputSplit(int index, String host) { + final int taskId = this.getParallelSubtaskIndex(); + synchronized (this.inputSplits) { + if (index < this.inputSplits.size()) { + return this.inputSplits.get(index); + } else { + final InputSplit nextInputSplit = this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId); Review comment: Ok, I think there shouldn't be too much re-computation overhead, since it happens once per attempt. I am fine with either way. Thx. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630841#comment-16630841 ] ASF GitHub Bot commented on FLINK-10205: isunjin commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r221024426 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() { return locationConstraint; } + public InputSplit getNextInputSplit(int index, String host) { + final int taskId = this.getParallelSubtaskIndex(); + synchronized (this.inputSplits) { + if (index < this.inputSplits.size()) { + return this.inputSplits.get(index); + } else { + final InputSplit nextInputSplit = this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId); Review comment: put null here is to avoid recomputing, otherwise if there is another execution attempt pull getNextSplit we need to recompute. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.6.1, 1.6.2 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] isunjin commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…
isunjin commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r221024426 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() { return locationConstraint; } + public InputSplit getNextInputSplit(int index, String host) { + final int taskId = this.getParallelSubtaskIndex(); + synchronized (this.inputSplits) { + if (index < this.inputSplits.size()) { + return this.inputSplits.get(index); + } else { + final InputSplit nextInputSplit = this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId); Review comment: put null here is to avoid recomputing, otherwise if there is another execution attempt pull getNextSplit we need to recompute. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10057) optimalize org.apache.flink.yarn.cli.FlinkYarnSessionCli.isYarnPropertiesFileMode
[ https://issues.apache.org/jira/browse/FLINK-10057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630825#comment-16630825 ] ASF GitHub Bot commented on FLINK-10057: lzqdename commented on issue #6491: [FLINK-10057] Update FlinkYarnSessionCli.java URL: https://github.com/apache/flink/pull/6491#issuecomment-425186485 @TisonKun @StefanRRichter will this PR be merged into master edition? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > optimalize > org.apache.flink.yarn.cli.FlinkYarnSessionCli.isYarnPropertiesFileMode > - > > Key: FLINK-10057 > URL: https://issues.apache.org/jira/browse/FLINK-10057 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.2 >Reporter: liuzq >Priority: Major > Labels: pull-request-available > > --in function : > org.apache.flink.yarn.cli.FlinkYarnSessionCli.isYarnPropertiesFileMode > > private boolean isYarnPropertiesFileMode(CommandLine commandLine) { > boolean canApplyYarnProperties = > !commandLine.hasOption(addressOption.getOpt()); > *//if canApplyYarnProperties is false,return quickly !* > *//this code is added by me* > *if(false==canApplyYarnProperties){* > *return canApplyYarnProperties;* > *}* > for (Option option : commandLine.getOptions()) { > if (allOptions.hasOption(option.getOpt())) { > if (!isDetachedOption(option)) { > ... > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] lzqdename commented on issue #6491: [FLINK-10057] Update FlinkYarnSessionCli.java
lzqdename commented on issue #6491: [FLINK-10057] Update FlinkYarnSessionCli.java URL: https://github.com/apache/flink/pull/6491#issuecomment-425186485 @TisonKun @StefanRRichter will this PR be merged into master edition? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630816#comment-16630816 ] ASF GitHub Bot commented on FLINK-10205: isunjin commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r221019265 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -307,6 +310,12 @@ boolean tryAssignResource(final LogicalSlot logicalSlot) { } } + public InputSplit getNextInputSplit() { + final LogicalSlot slot = this.getAssignedResource(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; Review comment: this is actually a refactor of original code (see line 577 of JobMaster.java), we keep the logic same. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.6.1, 1.6.2 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] isunjin commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…
isunjin commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r221019265 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -307,6 +310,12 @@ boolean tryAssignResource(final LogicalSlot logicalSlot) { } } + public InputSplit getNextInputSplit() { + final LogicalSlot slot = this.getAssignedResource(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; Review comment: this is actually a refactor of original code (see line 577 of JobMaster.java), we keep the logic same. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630790#comment-16630790 ] ASF GitHub Bot commented on FLINK-9377: --- StephanEwen commented on issue #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#issuecomment-425180397 @dawidwys and me had a long joint review / discussion session. A lot in this PR goes in a very good direction. We would suggest some not too large changes. Please let us know if you agree, we may have also overlooked or misinterpreted some implications. **Desired Goals** - Once this PR is merged, Flink should be in a releasable state, meaning that it should not matter if we manage to upgrade all serializers to the new model before the release. That is important for the time-based releasing and the uncertainty with respect to Scala code generation in serializers, etc. If we manage to upgrade half of Flink's serializers, it should be perfectly fine. - There are also users have written customer serializers and with that, custom config snapshots. We cannot expect that they update them strictly in sync with our release. It would be good for users to have an easy way to adjust their customer serializers' config snapshots such that they still work. - The current code follows the assumption that in order to be upgraded (1) serializers need to bump strictly one version and (2) on their backwards compatibility paths, read first the serializer (via Java serialization) and then the config snapshot data. Any serializer not following this strict contract will have errors during the reading of the config snapshot data. That seems a hard and fragile contract, especially considering that users also need to follow that path for upgrades. **Suggested Change** - We do not completely remove the Java Serialization for serializers, but make it optional. Config Snapshots can decide whether they want the prior serializer to be serialized into the meta info or not. This is similar to the current backwards compatibility path, but does not put the serialized serializer into the same byte stream and does not making the assumption that the specific value of the version tells you that. - We extend the `TypeSerializerConfigSnapshot` class as suggested below. This tells the config snapshot writer whether to write the previous serializer. It also removes the need for the "Backward Compatible Wrapper", because the TypeSerializerConfigSnapshot handles the bridging directly. - We introduce (possibly later PR) the `SerializerConfigSnapshotBackwardsAdapter` and the new `SerializerConfigSnapshot`. All updated config snapshots should extend the `SerializerConfigSnapshot` and the not updated ones should extend the `SerializerConfigSnapshotBackwardsAdapter`. We can then make the methods in TypeSerializerConfigSnapshot abstract. - Once we decide we have given enough of a grace period for users to update their serializers and config snapshots, we remove the optional serialization of the previous serializer, remove the old `TypeSerializerConfigSnapshot` class and only keep the new `SerializerConfigSnapshot` class. - This should follow the same spirit as this PR, but introduce fewer subtle contracts and decouple versioning from the removal of the Java serialization. ```java public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { private TypeSerializerpriorSerializer; public boolean needsPriorSerializerPersisted() { return true; } public void setPriorSerializer(TypeSerializer prior) { this.priorSerializer = prior; } public TypeSerializergetPriorSerializer() { return this.priorSerializer; } public TypeSerializerrestoreSerializer() { if (priorSerializer != null) { return priorSerializer; } else { throw new IllegalStateException(...); } } // ... the current methods } public class SerializerConfigSnapshotBackwardsAdapter extends TypeSerializerConfigSnapshot{ // same methods as above } public class SerializerConfigSnapshot extends TypeSerializerConfigSnapshot { public final boolean needsPriorSerializerPersisted() { return false; } public final void setPriorSerializer(TypeSerializer prior) {} public final TypeSerializergetPriorSerializer() { throw new UnsupportedOperationException(); } // this strictly needs to be implemented public abstract TypeSerializerrestoreSerializer(); } ``` What do you think about that? This is an automated message from the Apache Git Service. To respond
[GitHub] StephanEwen commented on issue #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
StephanEwen commented on issue #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#issuecomment-425180397 @dawidwys and me had a long joint review / discussion session. A lot in this PR goes in a very good direction. We would suggest some not too large changes. Please let us know if you agree, we may have also overlooked or misinterpreted some implications. **Desired Goals** - Once this PR is merged, Flink should be in a releasable state, meaning that it should not matter if we manage to upgrade all serializers to the new model before the release. That is important for the time-based releasing and the uncertainty with respect to Scala code generation in serializers, etc. If we manage to upgrade half of Flink's serializers, it should be perfectly fine. - There are also users have written customer serializers and with that, custom config snapshots. We cannot expect that they update them strictly in sync with our release. It would be good for users to have an easy way to adjust their customer serializers' config snapshots such that they still work. - The current code follows the assumption that in order to be upgraded (1) serializers need to bump strictly one version and (2) on their backwards compatibility paths, read first the serializer (via Java serialization) and then the config snapshot data. Any serializer not following this strict contract will have errors during the reading of the config snapshot data. That seems a hard and fragile contract, especially considering that users also need to follow that path for upgrades. **Suggested Change** - We do not completely remove the Java Serialization for serializers, but make it optional. Config Snapshots can decide whether they want the prior serializer to be serialized into the meta info or not. This is similar to the current backwards compatibility path, but does not put the serialized serializer into the same byte stream and does not making the assumption that the specific value of the version tells you that. - We extend the `TypeSerializerConfigSnapshot` class as suggested below. This tells the config snapshot writer whether to write the previous serializer. It also removes the need for the "Backward Compatible Wrapper", because the TypeSerializerConfigSnapshot handles the bridging directly. - We introduce (possibly later PR) the `SerializerConfigSnapshotBackwardsAdapter` and the new `SerializerConfigSnapshot`. All updated config snapshots should extend the `SerializerConfigSnapshot` and the not updated ones should extend the `SerializerConfigSnapshotBackwardsAdapter`. We can then make the methods in TypeSerializerConfigSnapshot abstract. - Once we decide we have given enough of a grace period for users to update their serializers and config snapshots, we remove the optional serialization of the previous serializer, remove the old `TypeSerializerConfigSnapshot` class and only keep the new `SerializerConfigSnapshot` class. - This should follow the same spirit as this PR, but introduce fewer subtle contracts and decouple versioning from the removal of the Java serialization. ```java public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { private TypeSerializerpriorSerializer; public boolean needsPriorSerializerPersisted() { return true; } public void setPriorSerializer(TypeSerializer prior) { this.priorSerializer = prior; } public TypeSerializergetPriorSerializer() { return this.priorSerializer; } public TypeSerializerrestoreSerializer() { if (priorSerializer != null) { return priorSerializer; } else { throw new IllegalStateException(...); } } // ... the current methods } public class SerializerConfigSnapshotBackwardsAdapter extends TypeSerializerConfigSnapshot{ // same methods as above } public class SerializerConfigSnapshot extends TypeSerializerConfigSnapshot { public final boolean needsPriorSerializerPersisted() { return false; } public final void setPriorSerializer(TypeSerializer prior) {} public final TypeSerializergetPriorSerializer() { throw new UnsupportedOperationException(); } // this strictly needs to be implemented public abstract TypeSerializerrestoreSerializer(); } ``` What do you think about that? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630761#comment-16630761 ] ASF GitHub Bot commented on FLINK-10205: xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r221002876 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() { return locationConstraint; } + public InputSplit getNextInputSplit(int index, String host) { + final int taskId = this.getParallelSubtaskIndex(); + synchronized (this.inputSplits) { + if (index < this.inputSplits.size()) { + return this.inputSplits.get(index); + } else { + final InputSplit nextInputSplit = this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId); Review comment: nit: When all input splits are exhausted for given vertex (nextInputSplit is null), you can just return null without adding an extra null element at the end of array list. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.6.1, 1.6.2 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630760#comment-16630760 ] ASF GitHub Bot commented on FLINK-10205: xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r221007252 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -704,6 +716,66 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + private JobGraph createDataSourceJobGraph() { + final TextInputFormat inputFormat = new TextInputFormat(new Path(".")); + final InputFormatVertex producer = new InputFormatVertex("Producer"); + new TaskConfig(producer.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper>(inputFormat)); + producer.setInvokableClass(DataSourceTask.class); + + final JobVertex consumer = new JobVertex("Consumer"); + consumer.setInvokableClass(NoOpInvokable.class); + consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + + final JobGraph jobGraph = new JobGraph(producer, consumer); + jobGraph.setAllowQueuedScheduling(true); + + return jobGraph; + } + + /** +* Tests the {@link JobMaster#requestNextInputSplit(JobVertexID, ExecutionAttemptID)} +* validate that it will get same result for a different retry +*/ + @Test + public void testRequestNextInputSplitWithDataSourceFailover() throws Exception { + + final JobGraph dataSourceJobGraph = createDataSourceJobGraph(); + testJobMasterAPIWithMockExecution(dataSourceJobGraph, (tdd, jobMaster) ->{ + try{ + final JobMasterGateway gateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + final TaskInformation taskInformation = tdd.getSerializedTaskInformation() + .deserializeValue(getClass().getClassLoader()); + JobVertexID vertexID = taskInformation.getJobVertexId(); + + //get the previous split + SerializedInputSplit split1 = gateway.requestNextInputSplit(vertexID, tdd.getExecutionAttemptId()).get(); + + //start a new version of this execution + ExecutionGraph executionGraph = jobMaster.getExecutionGraph(); + Execution execution = executionGraph.getRegisteredExecutions().get(tdd.getExecutionAttemptId()); + ExecutionVertex executionVertex = execution.getVertex(); + + long version = execution.getGlobalModVersion(); + gateway.updateTaskExecutionState(new TaskExecutionState(dataSourceJobGraph.getJobID(), tdd.getExecutionAttemptId(), ExecutionState.FINISHED)).get(); + Execution newExecution = executionVertex.resetForNewExecution(System.currentTimeMillis(), version); + + //get the new split + SerializedInputSplit split2 = gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get(); + + Assert.assertArrayEquals(split1.getInputSplitData(), split2.getInputSplitData()); + + //get the new split3 Review comment: Make sure you cover the case where input splits are exhausted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.6.1, 1.6.2 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent >
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630759#comment-16630759 ] ASF GitHub Bot commented on FLINK-10205: xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r221001196 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -307,6 +310,12 @@ boolean tryAssignResource(final LogicalSlot logicalSlot) { } } + public InputSplit getNextInputSplit() { + final LogicalSlot slot = this.getAssignedResource(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; Review comment: Under which condition, slot will be null? If slot is null, what does vertex.getNextInputSplit() returns? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.6.1, 1.6.2 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…
xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r221007252 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ## @@ -704,6 +716,66 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep } } + private JobGraph createDataSourceJobGraph() { + final TextInputFormat inputFormat = new TextInputFormat(new Path(".")); + final InputFormatVertex producer = new InputFormatVertex("Producer"); + new TaskConfig(producer.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper>(inputFormat)); + producer.setInvokableClass(DataSourceTask.class); + + final JobVertex consumer = new JobVertex("Consumer"); + consumer.setInvokableClass(NoOpInvokable.class); + consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + + final JobGraph jobGraph = new JobGraph(producer, consumer); + jobGraph.setAllowQueuedScheduling(true); + + return jobGraph; + } + + /** +* Tests the {@link JobMaster#requestNextInputSplit(JobVertexID, ExecutionAttemptID)} +* validate that it will get same result for a different retry +*/ + @Test + public void testRequestNextInputSplitWithDataSourceFailover() throws Exception { + + final JobGraph dataSourceJobGraph = createDataSourceJobGraph(); + testJobMasterAPIWithMockExecution(dataSourceJobGraph, (tdd, jobMaster) ->{ + try{ + final JobMasterGateway gateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + final TaskInformation taskInformation = tdd.getSerializedTaskInformation() + .deserializeValue(getClass().getClassLoader()); + JobVertexID vertexID = taskInformation.getJobVertexId(); + + //get the previous split + SerializedInputSplit split1 = gateway.requestNextInputSplit(vertexID, tdd.getExecutionAttemptId()).get(); + + //start a new version of this execution + ExecutionGraph executionGraph = jobMaster.getExecutionGraph(); + Execution execution = executionGraph.getRegisteredExecutions().get(tdd.getExecutionAttemptId()); + ExecutionVertex executionVertex = execution.getVertex(); + + long version = execution.getGlobalModVersion(); + gateway.updateTaskExecutionState(new TaskExecutionState(dataSourceJobGraph.getJobID(), tdd.getExecutionAttemptId(), ExecutionState.FINISHED)).get(); + Execution newExecution = executionVertex.resetForNewExecution(System.currentTimeMillis(), version); + + //get the new split + SerializedInputSplit split2 = gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get(); + + Assert.assertArrayEquals(split1.getInputSplitData(), split2.getInputSplitData()); + + //get the new split3 Review comment: Make sure you cover the case where input splits are exhausted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…
xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r221002876 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() { return locationConstraint; } + public InputSplit getNextInputSplit(int index, String host) { + final int taskId = this.getParallelSubtaskIndex(); + synchronized (this.inputSplits) { + if (index < this.inputSplits.size()) { + return this.inputSplits.get(index); + } else { + final InputSplit nextInputSplit = this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId); Review comment: nit: When all input splits are exhausted for given vertex (nextInputSplit is null), you can just return null without adding an extra null element at the end of array list. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…
xndai commented on a change in pull request #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#discussion_r221001196 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -307,6 +310,12 @@ boolean tryAssignResource(final LogicalSlot logicalSlot) { } } + public InputSplit getNextInputSplit() { + final LogicalSlot slot = this.getAssignedResource(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; Review comment: Under which condition, slot will be null? If slot is null, what does vertex.getNextInputSplit() returns? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10397) Remove CoreOptions#MODE
[ https://issues.apache.org/jira/browse/FLINK-10397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630712#comment-16630712 ] ASF GitHub Bot commented on FLINK-10397: tillrohrmann commented on issue #6752: [FLINK-10397] Remove CoreOptions#MODE URL: https://github.com/apache/flink/pull/6752#issuecomment-425159955 I agree with @TisonKun. Removing the `EXECUTION_MODE_OPTION` and `isNewMode` can be done in another subtask of FLINK-10392. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove CoreOptions#MODE > --- > > Key: FLINK-10397 > URL: https://issues.apache.org/jira/browse/FLINK-10397 > Project: Flink > Issue Type: Sub-task > Components: Configuration >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Remove the {{CoreOptions#MODE}} since it is no longer needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #6752: [FLINK-10397] Remove CoreOptions#MODE
tillrohrmann commented on issue #6752: [FLINK-10397] Remove CoreOptions#MODE URL: https://github.com/apache/flink/pull/6752#issuecomment-425159955 I agree with @TisonKun. Removing the `EXECUTION_MODE_OPTION` and `isNewMode` can be done in another subtask of FLINK-10392. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10399) Refractor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630706#comment-16630706 ] ASF GitHub Bot commented on FLINK-10399: TisonKun commented on issue #6737: [FLINK-10399] Refractor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#issuecomment-425159102 ping @StephanEwen what is the following step? I am confused that it stalls. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refractor ParameterTool#fromArgs > > > Key: FLINK-10399 > URL: https://issues.apache.org/jira/browse/FLINK-10399 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {{ParameterTool#fromArgs}} uses a weird implement which flink developer would > fail to parse it fast. > The main problem is that, when parse args, we always try to get a key-value > pair, but the implement iterate by a {{for}} loop, thus introduce weird > flag/mutable variable and branches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6737: [FLINK-10399] Refractor ParameterTool#fromArgs
TisonKun commented on issue #6737: [FLINK-10399] Refractor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#issuecomment-425159102 ping @StephanEwen what is the following step? I am confused that it stalls. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10419) ClassNotFoundException while deserializing user exceptions from checkpointing
[ https://issues.apache.org/jira/browse/FLINK-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630660#comment-16630660 ] Nico Kruber commented on FLINK-10419: - The initial error on the task managers leading to the error above apparently was the following, based on a too low Kafka brokers' timeout for the transactions to commit: {code} java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task.run(Task.java:1258) at java.util.concurrent.Executors.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. {code} > ClassNotFoundException while deserializing user exceptions from checkpointing > - > > Key: FLINK-10419 > URL: https://issues.apache.org/jira/browse/FLINK-10419 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > It seems that somewhere in the operator's failure handling, we hand a > user-code exception to the checkpoint coordinator via Java serialization but > it will then fail during the de-serialization because the class is not > available. This will result in the following error shadowing the real one: > {code} > java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher.loadClass(Launcher.java:338) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.flink.util.InstantiationUtil.resolveClass(InstantiationUtil.java:76) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1859) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2033) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:557) > at java.lang.Throwable.readObject(Throwable.java:914) > at sun.reflect.GeneratedMethodAccessor158.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) > at > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.readObject(RemoteRpcInvocation.java:222) > at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477) > at >
[jira] [Updated] (FLINK-10419) ClassNotFoundException while deserializing user exceptions from checkpointing
[ https://issues.apache.org/jira/browse/FLINK-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-10419: Description: It seems that somewhere in the operator's failure handling, we hand a user-code exception to the checkpoint coordinator via Java serialization but it will then fail during the de-serialization because the class is not available. This will result in the following error shadowing the real one: {code} java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher.loadClass(Launcher.java:338) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil.resolveClass(InstantiationUtil.java:76) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1859) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2033) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:557) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.GeneratedMethodAccessor158.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) at org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.readObject(RemoteRpcInvocation.java:222) at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:502) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:489) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:477) at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) at org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.deserializeMethodInvocation(RemoteRpcInvocation.java:118) at org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.getMethodName(RemoteRpcInvocation.java:59) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:214) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor3728anonfun.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
[jira] [Created] (FLINK-10455) Potential Kafka producer leak in case of failures
Nico Kruber created FLINK-10455: --- Summary: Potential Kafka producer leak in case of failures Key: FLINK-10455 URL: https://issues.apache.org/jira/browse/FLINK-10455 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.5.2 Reporter: Nico Kruber If the Kafka brokers' timeout is too low for our checkpoint interval [1], we may get an {{ProducerFencedException}}. Documentation around {{ProducerFencedException}} explicitly states that we should close the producer after encountering it. By looking at the code, it doesn't seem like this is actually done in {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an exception, we don't clean up (nor try to commit) any other transaction. -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} simply iterates over the {{pendingCommitTransactions}} which is not touched during {{close()}} Now if we restart the failing job on the same Flink cluster, any resources from the previous attempt will still linger around. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10397) Remove CoreOptions#MODE
[ https://issues.apache.org/jira/browse/FLINK-10397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630646#comment-16630646 ] ASF GitHub Bot commented on FLINK-10397: tillrohrmann commented on a change in pull request #6752: [FLINK-10397] Remove CoreOptions#MODE URL: https://github.com/apache/flink/pull/6752#discussion_r220978579 ## File path: flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java ## @@ -19,43 +19,21 @@ package org.apache.flink.client.cli; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.util.TestLogger; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - /** * Base test class for {@link CliFrontend} tests that wraps the new vs. legacy mode. Review comment: Good catch. Will update it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove CoreOptions#MODE > --- > > Key: FLINK-10397 > URL: https://issues.apache.org/jira/browse/FLINK-10397 > Project: Flink > Issue Type: Sub-task > Components: Configuration >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Remove the {{CoreOptions#MODE}} since it is no longer needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6752: [FLINK-10397] Remove CoreOptions#MODE
tillrohrmann commented on a change in pull request #6752: [FLINK-10397] Remove CoreOptions#MODE URL: https://github.com/apache/flink/pull/6752#discussion_r220978579 ## File path: flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestBase.java ## @@ -19,43 +19,21 @@ package org.apache.flink.client.cli; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.util.TestLogger; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - /** * Base test class for {@link CliFrontend} tests that wraps the new vs. legacy mode. Review comment: Good catch. Will update it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630623#comment-16630623 ] ASF GitHub Bot commented on FLINK-10384: yanghua edited a comment on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6730#issuecomment-425139027 @pnowojski I have refactored this PR's documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Sinh math function supported in Table API and SQL > - > > Key: FLINK-10384 > URL: https://issues.apache.org/jira/browse/FLINK-10384 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > like FLINK-10340 for adding Cosh math function -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630622#comment-16630622 ] ASF GitHub Bot commented on FLINK-10384: yanghua commented on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6730#issuecomment-425139027 @pnowojski also refactored this PR's documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Sinh math function supported in Table API and SQL > - > > Key: FLINK-10384 > URL: https://issues.apache.org/jira/browse/FLINK-10384 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > like FLINK-10340 for adding Cosh math function -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua edited a comment on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL
yanghua edited a comment on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6730#issuecomment-425139027 @pnowojski I have refactored this PR's documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL
yanghua commented on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6730#issuecomment-425139027 @pnowojski also refactored this PR's documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630616#comment-16630616 ] ASF GitHub Bot commented on FLINK-9455: --- tillrohrmann commented on issue #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors URL: https://github.com/apache/flink/pull/6734#issuecomment-425136715 Thanks for the review @azagrebin, @GJL and @shuai-xu. Merging this PR once Travis gives green light. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make SlotManager aware of multi slot TaskManagers > - > > Key: FLINK-9455 > URL: https://issues.apache.org/jira/browse/FLINK-9455 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, ResourceManager >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The {{SlotManager}} responsible for managing all available slots of a Flink > cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot > request. The started {{TaskManager}} can be started with multiple slots > configured but currently, the {{SlotManager}} thinks that it will be started > with a single slot. As a consequence, it might issue multiple requests to > start new TaskManagers even though a single one would be sufficient to > fulfill all pending slot requests. > In order to avoid requesting unnecessary resources which are freed after the > idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a > {{TaskManager}} is started with. That way the SlotManager only needs to > request a new {{TaskManager}} if all of the previously started slots > (potentially not yet registered and, thus, future slots) are being assigned > to slot requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors
tillrohrmann commented on issue #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors URL: https://github.com/apache/flink/pull/6734#issuecomment-425136715 Thanks for the review @azagrebin, @GJL and @shuai-xu. Merging this PR once Travis gives green light. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10415) RestClient does not react to lost connection
[ https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630601#comment-16630601 ] ASF GitHub Bot commented on FLINK-10415: tillrohrmann commented on issue #6763: [FLINK-10415] Fail response future if connection closes in RestClient URL: https://github.com/apache/flink/pull/6763#issuecomment-425132571 Yes, I will increase the idleness timeout to 5 minutes. Merging this PR then. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestClient does not react to lost connection > > > Key: FLINK-10415 > URL: https://issues.apache.org/jira/browse/FLINK-10415 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.1, 1.7.0, 1.5.4 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not > seem to react to a lost connections in time. When sending a request to the > current leader it happened that the leader was killed just after establishing > the connection. Then the {{RestClient}} did not fail the connection and was > stuck in writing a request or retrieving a response from the lost leader. I'm > wondering whether we should introduce a {{ReadTimeoutHandler}} and > {{WriteTimeoutHandler}} to handle these problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #6763: [FLINK-10415] Fail response future if connection closes in RestClient
tillrohrmann commented on issue #6763: [FLINK-10415] Fail response future if connection closes in RestClient URL: https://github.com/apache/flink/pull/6763#issuecomment-425132571 Yes, I will increase the idleness timeout to 5 minutes. Merging this PR then. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630600#comment-16630600 ] ASF GitHub Bot commented on FLINK-10402: StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220965813 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -397,7 +397,7 @@ private Configuration generateClusterConfiguration(Configuration configuration) return resultConfiguration; } - private CompletableFuture shutDownAsync( + public CompletableFuture shutDownAsync( Review comment: Hmm, I would leave that call up to you. My feeling is when somebody receives a reference of this class that is not allowed to call `shutDownAsync`, I would better not make it public at all to prevent that clients call it accidentally. If that is not the case, yes why not use the interface that we have for the concept. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port AbstractTaskManagerProcessFailureRecoveryTest to new code base > --- > > Key: FLINK-10402 > URL: https://issues.apache.org/jira/browse/FLINK-10402 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
StefanRRichter commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220965813 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -397,7 +397,7 @@ private Configuration generateClusterConfiguration(Configuration configuration) return resultConfiguration; } - private CompletableFuture shutDownAsync( + public CompletableFuture shutDownAsync( Review comment: Hmm, I would leave that call up to you. My feeling is when somebody receives a reference of this class that is not allowed to call `shutDownAsync`, I would better not make it public at all to prevent that clients call it accidentally. If that is not the case, yes why not use the interface that we have for the concept. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630595#comment-16630595 ] ASF GitHub Bot commented on FLINK-10402: tillrohrmann commented on issue #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#issuecomment-425131771 Thanks for the review @StefanRRichter. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port AbstractTaskManagerProcessFailureRecoveryTest to new code base > --- > > Key: FLINK-10402 > URL: https://issues.apache.org/jira/browse/FLINK-10402 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
tillrohrmann commented on issue #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#issuecomment-425131771 Thanks for the review @StefanRRichter. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630592#comment-16630592 ] ASF GitHub Bot commented on FLINK-10402: tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220964730 ## File path: flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ## @@ -173,10 +140,6 @@ public void testTaskManagerProcessFailure() throws Exception { taskManagerProcess2 = new ProcessBuilder(command).start(); new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2); - // we wait for the JobManager to have the two TaskManagers available - // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes) - waitUntilNumTaskManagersAreRegistered(jmActor, 2, 12); Review comment: Yes, we can skip this part, because we support now queued scheduling where the TMs don't need to be present before you start the job. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port AbstractTaskManagerProcessFailureRecoveryTest to new code base > --- > > Key: FLINK-10402 > URL: https://issues.apache.org/jira/browse/FLINK-10402 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220964730 ## File path: flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ## @@ -173,10 +140,6 @@ public void testTaskManagerProcessFailure() throws Exception { taskManagerProcess2 = new ProcessBuilder(command).start(); new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2); - // we wait for the JobManager to have the two TaskManagers available - // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes) - waitUntilNumTaskManagersAreRegistered(jmActor, 2, 12); Review comment: Yes, we can skip this part, because we support now queued scheduling where the TMs don't need to be present before you start the job. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10410) Search for broken links on travis
[ https://issues.apache.org/jira/browse/FLINK-10410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630591#comment-16630591 ] Piotr Nowojski commented on FLINK-10410: Ok, thanks [~Zentol] :) > Search for broken links on travis > - > > Key: FLINK-10410 > URL: https://issues.apache.org/jira/browse/FLINK-10410 > Project: Flink > Issue Type: Improvement > Components: Documentation, Travis >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.7.0 > > > In the {{/docs/check_links.sh}} directory we have a handy script for > searching dead links. We could use this to automatically find dead links on > travis. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630590#comment-16630590 ] ASF GitHub Bot commented on FLINK-10402: tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220964322 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -397,7 +397,7 @@ private Configuration generateClusterConfiguration(Configuration configuration) return resultConfiguration; } - private CompletableFuture shutDownAsync( + public CompletableFuture shutDownAsync( Review comment: What about letting `ClusterEntrypoint` implement `AutoCloseableAsync` and make this public? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port AbstractTaskManagerProcessFailureRecoveryTest to new code base > --- > > Key: FLINK-10402 > URL: https://issues.apache.org/jira/browse/FLINK-10402 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630589#comment-16630589 ] ASF GitHub Bot commented on FLINK-9712: --- pnowojski commented on issue #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#issuecomment-425130543 I have almost all of the comments (or wrote reply if not). Can you check the revised PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220964322 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ## @@ -397,7 +397,7 @@ private Configuration generateClusterConfiguration(Configuration configuration) return resultConfiguration; } - private CompletableFuture shutDownAsync( + public CompletableFuture shutDownAsync( Review comment: What about letting `ClusterEntrypoint` implement `AutoCloseableAsync` and make this public? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on issue #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins
pnowojski commented on issue #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#issuecomment-425130543 I have almost all of the comments (or wrote reply if not). Can you check the revised PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10402) Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630583#comment-16630583 ] ASF GitHub Bot commented on FLINK-10402: tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220961425 ## File path: flink-tests/src/test/resources/log4j-test.properties ## @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger Review comment: Yes, will revert it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port AbstractTaskManagerProcessFailureRecoveryTest to new code base > --- > > Key: FLINK-10402 > URL: https://issues.apache.org/jira/browse/FLINK-10402 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{AbstractTaskManagerProcessFailureRecoveryTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base
tillrohrmann commented on a change in pull request #6750: [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base URL: https://github.com/apache/flink/pull/6750#discussion_r220961425 ## File path: flink-tests/src/test/resources/log4j-test.properties ## @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger Review comment: Yes, will revert it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630578#comment-16630578 ] ASF GitHub Bot commented on FLINK-9712: --- pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220960721 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +10:45 Euro116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time +--- + +### Defining Temporal Table Function + +In order to define processing time Temporal Table: + + + +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + +List> ordersData = new ArrayList<>(); Review comment: Added more comments, please check them This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630573#comment-16630573 ] ASF GitHub Bot commented on FLINK-9712: --- pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220960460 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +10:45 Euro116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time +--- + +### Defining Temporal Table Function + +In order to define processing time Temporal Table: + + + +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + +List> ordersData = new ArrayList<>(); +ordersData.add(Tuple2.of(2L, "Euro")); +ordersData.add(Tuple2.of(1L, "US Dollar")); +ordersData.add(Tuple2.of(50L, "Yen")); +ordersData.add(Tuple2.of(3L, "Euro")); +ordersData.add(Tuple2.of(5L, "US Dollar")); + +List> ratesHistoryData = new ArrayList<>(); +ratesHistoryData.add(Tuple2.of("US
[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220960721 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +10:45 Euro116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time +--- + +### Defining Temporal Table Function + +In order to define processing time Temporal Table: + + + +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + +List> ordersData = new ArrayList<>(); Review comment: Added more comments, please check them This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220960460 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +10:45 Euro116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time +--- + +### Defining Temporal Table Function + +In order to define processing time Temporal Table: + + + +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + +List> ordersData = new ArrayList<>(); +ordersData.add(Tuple2.of(2L, "Euro")); +ordersData.add(Tuple2.of(1L, "US Dollar")); +ordersData.add(Tuple2.of(50L, "Yen")); +ordersData.add(Tuple2.of(3L, "Euro")); +ordersData.add(Tuple2.of(5L, "US Dollar")); + +List> ratesHistoryData = new ArrayList<>(); +ratesHistoryData.add(Tuple2.of("US Dollar", 102L)); +ratesHistoryData.add(Tuple2.of("Euro", 114L)); +ratesHistoryData.add(Tuple2.of("Yen", 1L)); +ratesHistoryData.add(Tuple2.of("Euro", 116L)); +ratesHistoryData.add(Tuple2.of("Euro", 119L)); + +DataStreamSource> ordersStream =
[jira] [Commented] (FLINK-10410) Search for broken links on travis
[ https://issues.apache.org/jira/browse/FLINK-10410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630566#comment-16630566 ] Chesnay Schepler commented on FLINK-10410: -- You can also fork the branch {{cron-master-docs}} and configure another remote/branch in {{.travis.yml}}. > Search for broken links on travis > - > > Key: FLINK-10410 > URL: https://issues.apache.org/jira/browse/FLINK-10410 > Project: Flink > Issue Type: Improvement > Components: Documentation, Travis >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.7.0 > > > In the {{/docs/check_links.sh}} directory we have a handy script for > searching dead links. We could use this to automatically find dead links on > travis. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6873) Limit the number of open writers in file system connector
[ https://issues.apache.org/jira/browse/FLINK-6873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630562#comment-16630562 ] Oscar Westra van Holthe - Kind commented on FLINK-6873: --- What may also be an issue is that the BucketingSink and the newer StreamingFileSink seem to ignore event time.. Thus, if your output stream uses a buffer of some sorts and your job catches up quickly (processing multiple days worth of events in a few hours), the sink may end up having too many open files. > Limit the number of open writers in file system connector > - > > Key: FLINK-6873 > URL: https://issues.apache.org/jira/browse/FLINK-6873 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector, Local Runtime, Streaming Connectors >Reporter: Mu Kong >Priority: Major > > Mail list discuss: > https://mail.google.com/mail/u/1/#label/MailList%2Fflink-dev/15c869b2a5b20d43 > Following exception will occur when Flink is writing to too many files: > {code} > java.lang.OutOfMemoryError: unable to create new native thread > at java.lang.Thread.start0(Native Method) > at java.lang.Thread.start(Thread.java:714) > at org.apache.hadoop.hdfs.DFSOutputStream.start(DFSOutputStream.java:2170) > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1685) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624) > at > org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448) > at > org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787) > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:120) > at > org.apache.flink.streaming.connectors.fs.StringWriter.open(StringWriter.java:62) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:545) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:440) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:230) > at > org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:379) > {code} > Letting developers decide the max open connections to the open files would be > great. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630560#comment-16630560 ] ASF GitHub Bot commented on FLINK-9712: --- pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220957402 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +10:45 Euro116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time +--- + +### Defining Temporal Table Function Review comment: Isn't `define` more widely used in such contexts? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL:
[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220957402 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +10:45 Euro116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time +--- + +### Defining Temporal Table Function Review comment: Isn't `define` more widely used in such contexts? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630554#comment-16630554 ] ASF GitHub Bot commented on FLINK-9712: --- pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220956370 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +10:45 Euro116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time +--- + +### Defining Temporal Table Function + +In order to define processing time Temporal Table: + + + +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); Review comment: I have added the `TemporalTableFunction` specific import. For the rest of the stuff we would need to provide real code example. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the
[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220956370 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +10:45 Euro116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time +--- + +### Defining Temporal Table Function + +In order to define processing time Temporal Table: + + + +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); Review comment: I have added the `TemporalTableFunction` specific import. For the rest of the stuff we would need to provide real code example. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10415) RestClient does not react to lost connection
[ https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630530#comment-16630530 ] ASF GitHub Bot commented on FLINK-10415: zentol commented on issue #6763: [FLINK-10415] Fail response future if connection closes in RestClient URL: https://github.com/apache/flink/pull/6763#issuecomment-425115999 urgh, well that's unfortunate. We may want to increase the idle timeout to a larger value though (maybe 5 minutes?), as it effectively is an upper limit for the timeout that a user may specify. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestClient does not react to lost connection > > > Key: FLINK-10415 > URL: https://issues.apache.org/jira/browse/FLINK-10415 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.1, 1.7.0, 1.5.4 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not > seem to react to a lost connections in time. When sending a request to the > current leader it happened that the leader was killed just after establishing > the connection. Then the {{RestClient}} did not fail the connection and was > stuck in writing a request or retrieving a response from the lost leader. I'm > wondering whether we should introduce a {{ReadTimeoutHandler}} and > {{WriteTimeoutHandler}} to handle these problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on issue #6763: [FLINK-10415] Fail response future if connection closes in RestClient
zentol commented on issue #6763: [FLINK-10415] Fail response future if connection closes in RestClient URL: https://github.com/apache/flink/pull/6763#issuecomment-425115999 urgh, well that's unfortunate. We may want to increase the idle timeout to a larger value though (maybe 5 minutes?), as it effectively is an upper limit for the timeout that a user may specify. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10401) Port ProcessFailureCancelingITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630510#comment-16630510 ] ASF GitHub Bot commented on FLINK-10401: tillrohrmann commented on issue #6749: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base URL: https://github.com/apache/flink/pull/6749#issuecomment-425111422 Thanks for the review @StefanRRichter. Addressing your comment and then merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port ProcessFailureCancelingITCase to new code base > --- > > Key: FLINK-10401 > URL: https://issues.apache.org/jira/browse/FLINK-10401 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{ProcessFailureCancelingITCase}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #6749: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base
tillrohrmann commented on issue #6749: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base URL: https://github.com/apache/flink/pull/6749#issuecomment-425111422 Thanks for the review @StefanRRichter. Addressing your comment and then merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630506#comment-16630506 ] ASF GitHub Bot commented on FLINK-9712: --- pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220942767 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. Review comment: Changed to: > "one must pass a time attribute that determines the version of the table that will be returned." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10401) Port ProcessFailureCancelingITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630505#comment-16630505 ] ASF GitHub Bot commented on FLINK-10401: tillrohrmann commented on a change in pull request #6749: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base URL: https://github.com/apache/flink/pull/6749#discussion_r220942743 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java ## @@ -130,6 +131,13 @@ public T getDispatcher() { } } + @VisibleForTesting + public WebMonitorEndpoint getWebMonitorEndpoint() { Review comment: Yes you're right. I've incorporated your feedback from #6743 and with that it should no longer be necessary to have this getter. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port ProcessFailureCancelingITCase to new code base > --- > > Key: FLINK-10401 > URL: https://issues.apache.org/jira/browse/FLINK-10401 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{ProcessFailureCancelingITCase}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6749: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base
tillrohrmann commented on a change in pull request #6749: [FLINK-10401] Port ProcessFailureCancelingITCase to new code base URL: https://github.com/apache/flink/pull/6749#discussion_r220942743 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java ## @@ -130,6 +131,13 @@ public T getDispatcher() { } } + @VisibleForTesting + public WebMonitorEndpoint getWebMonitorEndpoint() { Review comment: Yes you're right. I've incorporated your feedback from #6743 and with that it should no longer be necessary to have this getter. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220942767 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. Review comment: Changed to: > "one must pass a time attribute that determines the version of the table that will be returned." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630502#comment-16630502 ] ASF GitHub Bot commented on FLINK-9712: --- pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220941093 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: Review comment: Good point, it's also about efficiency. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220941093 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: Review comment: Good point, it's also about efficiency. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10396) Remove codebase switch from MiniClusterResource
[ https://issues.apache.org/jira/browse/FLINK-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630491#comment-16630491 ] ASF GitHub Bot commented on FLINK-10396: tillrohrmann commented on a change in pull request #6748: [FLINK-10396] Remove CodebaseType URL: https://github.com/apache/flink/pull/6748#discussion_r220939564 ## File path: flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala ## @@ -322,32 +320,20 @@ object ScalaShellITCase { @BeforeClass def beforeAll(): Unit = { -val isNew = TestBaseUtils.isNewCodebase() -if (isNew) { - configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) - // set to different than default so not to interfere with ScalaShellLocalStartupITCase - configuration.setInteger(RestOptions.PORT, 8082) - val miniConfig = new MiniClusterConfiguration.Builder() -.setConfiguration(configuration) -.setNumSlotsPerTaskManager(parallelism) -.build() - - val miniCluster = new MiniCluster(miniConfig) - miniCluster.start() - port = miniCluster.getRestAddress.getPort - hostname = miniCluster.getRestAddress.getHost - - cluster = Some(Left(miniCluster)) -} else { - configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE) - configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism) - val standaloneCluster = new StandaloneMiniCluster(configuration) - - hostname = standaloneCluster.getHostname - port = standaloneCluster.getPort - - cluster = Some(Right(standaloneCluster)) -} +configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) +// set to different than default so not to interfere with ScalaShellLocalStartupITCase +configuration.setInteger(RestOptions.PORT, 8082) +val miniConfig = new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumSlotsPerTaskManager(parallelism) + .build() + +val miniCluster = new MiniCluster(miniConfig) +miniCluster.start() +port = miniCluster.getRestAddress.getPort +hostname = miniCluster.getRestAddress.getHost + +cluster = Some(Left(miniCluster)) Review comment: I don't think so. I will remove it and also remove the `StandaloneMiniCluster` at the same moment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove codebase switch from MiniClusterResource > --- > > Key: FLINK-10396 > URL: https://issues.apache.org/jira/browse/FLINK-10396 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: > 0001-FLINK-10396-Remove-codebase-switch-in-UT-IT-tests.patch > > > Remove the legacy codebase switch from {{MiniClusterResource}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10396) Remove codebase switch from MiniClusterResource
[ https://issues.apache.org/jira/browse/FLINK-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16630494#comment-16630494 ] ASF GitHub Bot commented on FLINK-10396: tillrohrmann commented on a change in pull request #6748: [FLINK-10396] Remove CodebaseType URL: https://github.com/apache/flink/pull/6748#discussion_r220939564 ## File path: flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala ## @@ -322,32 +320,20 @@ object ScalaShellITCase { @BeforeClass def beforeAll(): Unit = { -val isNew = TestBaseUtils.isNewCodebase() -if (isNew) { - configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) - // set to different than default so not to interfere with ScalaShellLocalStartupITCase - configuration.setInteger(RestOptions.PORT, 8082) - val miniConfig = new MiniClusterConfiguration.Builder() -.setConfiguration(configuration) -.setNumSlotsPerTaskManager(parallelism) -.build() - - val miniCluster = new MiniCluster(miniConfig) - miniCluster.start() - port = miniCluster.getRestAddress.getPort - hostname = miniCluster.getRestAddress.getHost - - cluster = Some(Left(miniCluster)) -} else { - configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE) - configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism) - val standaloneCluster = new StandaloneMiniCluster(configuration) - - hostname = standaloneCluster.getHostname - port = standaloneCluster.getPort - - cluster = Some(Right(standaloneCluster)) -} +configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) +// set to different than default so not to interfere with ScalaShellLocalStartupITCase +configuration.setInteger(RestOptions.PORT, 8082) +val miniConfig = new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumSlotsPerTaskManager(parallelism) + .build() + +val miniCluster = new MiniCluster(miniConfig) +miniCluster.start() +port = miniCluster.getRestAddress.getPort +hostname = miniCluster.getRestAddress.getHost + +cluster = Some(Left(miniCluster)) Review comment: I don't think so. I will change it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove codebase switch from MiniClusterResource > --- > > Key: FLINK-10396 > URL: https://issues.apache.org/jira/browse/FLINK-10396 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: > 0001-FLINK-10396-Remove-codebase-switch-in-UT-IT-tests.patch > > > Remove the legacy codebase switch from {{MiniClusterResource}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6748: [FLINK-10396] Remove CodebaseType
tillrohrmann commented on a change in pull request #6748: [FLINK-10396] Remove CodebaseType URL: https://github.com/apache/flink/pull/6748#discussion_r220939564 ## File path: flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala ## @@ -322,32 +320,20 @@ object ScalaShellITCase { @BeforeClass def beforeAll(): Unit = { -val isNew = TestBaseUtils.isNewCodebase() -if (isNew) { - configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) - // set to different than default so not to interfere with ScalaShellLocalStartupITCase - configuration.setInteger(RestOptions.PORT, 8082) - val miniConfig = new MiniClusterConfiguration.Builder() -.setConfiguration(configuration) -.setNumSlotsPerTaskManager(parallelism) -.build() - - val miniCluster = new MiniCluster(miniConfig) - miniCluster.start() - port = miniCluster.getRestAddress.getPort - hostname = miniCluster.getRestAddress.getHost - - cluster = Some(Left(miniCluster)) -} else { - configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE) - configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism) - val standaloneCluster = new StandaloneMiniCluster(configuration) - - hostname = standaloneCluster.getHostname - port = standaloneCluster.getPort - - cluster = Some(Right(standaloneCluster)) -} +configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) +// set to different than default so not to interfere with ScalaShellLocalStartupITCase +configuration.setInteger(RestOptions.PORT, 8082) +val miniConfig = new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumSlotsPerTaskManager(parallelism) + .build() + +val miniCluster = new MiniCluster(miniConfig) +miniCluster.start() +port = miniCluster.getRestAddress.getPort +hostname = miniCluster.getRestAddress.getHost + +cluster = Some(Left(miniCluster)) Review comment: I don't think so. I will change it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services