[GitHub] flink pull request #5171: [FLINK-8271][Kinesis connector] upgrade deprecated...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5171#discussion_r159362925 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java --- @@ -30,37 +30,44 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.regions.Region; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import java.util.Properties; /** * Some utilities specific to Amazon Web Service. */ public class AWSUtil { + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; /** -* Creates an Amazon Kinesis Client. +* Creates an AmazonKinesis client. * @param configProps configuration properties containing the access key, secret key, and region -* @return a new Amazon Kinesis Client +* @return a new AmazonKinesis client */ - public static AmazonKinesisClient createKinesisClient(Properties configProps) { + public static AmazonKinesis createKinesisClient(Properties configProps) { // set a Flink-specific user agent - ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); - awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + - " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig() + .withUserAgentPrefix(String.format(USER_AGENT_FORMAT, --- End diff -- @tzulitai Check out [AWS source code here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/ClientConfiguration.java#L467), `setUserAgent` and `withUserAgentPrefix` are both calling `setUserAgentPrefix` ---
[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis
[ https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309184#comment-16309184 ] ASF GitHub Bot commented on FLINK-8271: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5171#discussion_r159363827 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java --- @@ -30,37 +30,44 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.regions.Region; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import java.util.Properties; /** * Some utilities specific to Amazon Web Service. */ public class AWSUtil { + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; /** -* Creates an Amazon Kinesis Client. +* Creates an AmazonKinesis client. * @param configProps configuration properties containing the access key, secret key, and region -* @return a new Amazon Kinesis Client +* @return a new AmazonKinesis client */ - public static AmazonKinesisClient createKinesisClient(Properties configProps) { + public static AmazonKinesis createKinesisClient(Properties configProps) { // set a Flink-specific user agent - ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); - awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + - " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig() + .withUserAgentPrefix(String.format(USER_AGENT_FORMAT, + EnvironmentInformation.getVersion(), + EnvironmentInformation.getRevisionInformation().commitId)); // utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider - AmazonKinesisClient client = new AmazonKinesisClient( - AWSUtil.getCredentialsProvider(configProps), awsClientConfig); + AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard() + .withCredentials(AWSUtil.getCredentialsProvider(configProps)) + .withClientConfiguration(awsClientConfig) + .withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))); - client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION; if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) { - client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT)); + builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT), + configProps.getProperty(AWSConfigConstants.AWS_REGION))); --- End diff -- @tzulitai You are right. After some research I found the `region` field in `EndpointConfiguration` ([here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/client/builder/AwsClientBuilder.java#L363)) is used the same as [old code here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/AmazonWebServiceClient.java#L368). I will set it to null. Take it a further step, if what you described is a frequent use case, we should add a unit test for it for future validation. Would you like to create a ticket and a PR for it? > upgrade from deprecated classes to AmazonKinesis > > > Key: FLINK-8271 >
[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis
[ https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309185#comment-16309185 ] ASF GitHub Bot commented on FLINK-8271: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5171#discussion_r159362925 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java --- @@ -30,37 +30,44 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.regions.Region; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import java.util.Properties; /** * Some utilities specific to Amazon Web Service. */ public class AWSUtil { + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; /** -* Creates an Amazon Kinesis Client. +* Creates an AmazonKinesis client. * @param configProps configuration properties containing the access key, secret key, and region -* @return a new Amazon Kinesis Client +* @return a new AmazonKinesis client */ - public static AmazonKinesisClient createKinesisClient(Properties configProps) { + public static AmazonKinesis createKinesisClient(Properties configProps) { // set a Flink-specific user agent - ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); - awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + - " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig() + .withUserAgentPrefix(String.format(USER_AGENT_FORMAT, --- End diff -- @tzulitai Check out [AWS source code here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/ClientConfiguration.java#L467), `setUserAgent` and `withUserAgentPrefix` are both calling `setUserAgentPrefix` > upgrade from deprecated classes to AmazonKinesis > > > Key: FLINK-8271 > URL: https://issues.apache.org/jira/browse/FLINK-8271 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0, 1.4.1 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5171: [FLINK-8271][Kinesis connector] upgrade deprecated...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5171#discussion_r159363827 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java --- @@ -30,37 +30,44 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.SystemPropertiesCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.regions.Region; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; -import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import java.util.Properties; /** * Some utilities specific to Amazon Web Service. */ public class AWSUtil { + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; /** -* Creates an Amazon Kinesis Client. +* Creates an AmazonKinesis client. * @param configProps configuration properties containing the access key, secret key, and region -* @return a new Amazon Kinesis Client +* @return a new AmazonKinesis client */ - public static AmazonKinesisClient createKinesisClient(Properties configProps) { + public static AmazonKinesis createKinesisClient(Properties configProps) { // set a Flink-specific user agent - ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); - awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + - " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig() + .withUserAgentPrefix(String.format(USER_AGENT_FORMAT, + EnvironmentInformation.getVersion(), + EnvironmentInformation.getRevisionInformation().commitId)); // utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider - AmazonKinesisClient client = new AmazonKinesisClient( - AWSUtil.getCredentialsProvider(configProps), awsClientConfig); + AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard() + .withCredentials(AWSUtil.getCredentialsProvider(configProps)) + .withClientConfiguration(awsClientConfig) + .withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))); - client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION; if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) { - client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT)); + builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT), + configProps.getProperty(AWSConfigConstants.AWS_REGION))); --- End diff -- @tzulitai You are right. After some research I found the `region` field in `EndpointConfiguration` ([here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/client/builder/AwsClientBuilder.java#L363)) is used the same as [old code here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/AmazonWebServiceClient.java#L368). I will set it to null. Take it a further step, if what you described is a frequent use case, we should add a unit test for it for future validation. Would you like to create a ticket and a PR for it? ---
[jira] [Commented] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala
[ https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309169#comment-16309169 ] ASF GitHub Bot commented on FLINK-8175: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5112 @StephanEwen let me know if you have more concerns > remove flink-streaming-contrib and migrate its classes to > flink-streaming-java/scala > > > Key: FLINK-8175 > URL: https://issues.apache.org/jira/browse/FLINK-8175 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0 > > > I propose removing flink-streaming-contrib from flink-contrib, and migrating > its classes to flink-streaming-java/scala for the following reasons: > - flink-streaming-contrib is so small that it only has 4 classes (3 java and > 1 scala), and they don't need a dedicated jar for Flink to distribute and > maintain it and for users to deal with the overhead of dependency management > - the 4 classes in flink-streaming-contrib are logically more tied to > flink-streaming-java/scala, and thus can be easily migrated > - flink-contrib is already too crowded and noisy. It contains lots of sub > modules with different purposes which confuse developers and users, and they > lack a proper project hierarchy > To take a step even forward, I would argue that even flink-contrib should be > removed and all its submodules should be migrated to other top-level modules > for the following reasons: 1) Apache Flink the whole project itself is a > result of contributions from many developers, there's no reason to highlight > some contributions in a dedicated module named 'contrib' 2) flink-contrib > inherently doesn't have a good hierarchy to hold submodules -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala
[ https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309170#comment-16309170 ] ASF GitHub Bot commented on FLINK-8175: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5112 @StephanEwen @tillrohrmann let me know if you have more concerns > remove flink-streaming-contrib and migrate its classes to > flink-streaming-java/scala > > > Key: FLINK-8175 > URL: https://issues.apache.org/jira/browse/FLINK-8175 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0 > > > I propose removing flink-streaming-contrib from flink-contrib, and migrating > its classes to flink-streaming-java/scala for the following reasons: > - flink-streaming-contrib is so small that it only has 4 classes (3 java and > 1 scala), and they don't need a dedicated jar for Flink to distribute and > maintain it and for users to deal with the overhead of dependency management > - the 4 classes in flink-streaming-contrib are logically more tied to > flink-streaming-java/scala, and thus can be easily migrated > - flink-contrib is already too crowded and noisy. It contains lots of sub > modules with different purposes which confuse developers and users, and they > lack a proper project hierarchy > To take a step even forward, I would argue that even flink-contrib should be > removed and all its submodules should be migrated to other top-level modules > for the following reasons: 1) Apache Flink the whole project itself is a > result of contributions from many developers, there's no reason to highlight > some contributions in a dedicated module named 'contrib' 2) flink-contrib > inherently doesn't have a good hierarchy to hold submodules -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5112: [FLINK-8175] [DataStream API java/scala] remove flink-str...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5112 @StephanEwen @tillrohrmann let me know if you have more concerns ---
[jira] [Commented] (FLINK-8217) Properly annotate APIs of flink-connector-kinesis
[ https://issues.apache.org/jira/browse/FLINK-8217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309168#comment-16309168 ] ASF GitHub Bot commented on FLINK-8217: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5138 @tzulitai The PR has been updated > Properly annotate APIs of flink-connector-kinesis > - > > Key: FLINK-8217 > URL: https://issues.apache.org/jira/browse/FLINK-8217 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5112: [FLINK-8175] [DataStream API java/scala] remove flink-str...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5112 @StephanEwen let me know if you have more concerns ---
[jira] [Commented] (FLINK-7475) support update() in ListState
[ https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309167#comment-16309167 ] ASF GitHub Bot commented on FLINK-7475: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4963 @yunfan123 @aljoscha @StefanRRichter feedbacks are appreciated > support update() in ListState > - > > Key: FLINK-7475 > URL: https://issues.apache.org/jira/browse/FLINK-7475 > Project: Flink > Issue Type: Improvement > Components: Core, DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: yf >Assignee: Bowen Li > Fix For: 1.5.0 > > > If I want to update the list. > I have to do two steps: > listState.clear() > for (Element e : myList) { > listState.add(e); > } > Why not I update the state by: > listState.update(myList) ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5138: [FLINK-8217] [Kinesis connector] Properly annotate APIs o...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5138 @tzulitai The PR has been updated ---
[GitHub] flink issue #4963: [FLINK-7475] [core][DataStream API] support update() in L...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4963 @yunfan123 @aljoscha @StefanRRichter feedbacks are appreciated ---
[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables
[ https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309048#comment-16309048 ] Wei-Che Wei commented on FLINK-7935: Hi [~elevy] Since FLINK-7692 was solved, user can use {{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} and {{MetricGroup#getAllVariables()}} to get the same name but with different tags on user-defined variables. Did it solve what you need in this issue or is there any improvement that we can make on this issue? Thank you. > Metrics with user supplied scope variables > -- > > Key: FLINK-7935 > URL: https://issues.apache.org/jira/browse/FLINK-7935 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.2 >Reporter: Elias Levy > > We use DataDog for metrics. DD and Flink differ somewhat in how they track > metrics. > Flink names and scopes metrics together, at least by default. E.g. by default > the System scope for operator metrics is > {{.taskmanager}}. > The scope variables become part of the metric's full name. > In DD the metric would be named something generic, e.g. > {{taskmanager.job.operator}}, and they would be distinguished by their tag > values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}. > Flink allows you to configure the format string for system scopes, so it is > possible to set the operator scope format to {{taskmanager.job.operator}}. > We do this for all scopes: > {code} > metrics.scope.jm: jobmanager > metrics.scope.jm.job: jobmanager.job > metrics.scope.tm: taskmanager > metrics.scope.tm.job: taskmanager.job > metrics.scope.task: taskmanager.job.task > metrics.scope.operator: taskmanager.job.operator > {code} > This seems to work. The DataDog Flink metric's plugin submits all scope > variables as tags, even if they are not used within the scope format. And it > appears internally this does not lead to metrics conflicting with each other. > We would like to extend this to user defined metrics, but you can define > variables/scopes when adding a metric group or metric with the user API, so > that in DD we have a single metric with a tag with many different values, > rather than hundreds of metrics to just the one value we want to measure > across different event types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8352) Flink UI Reports No Error on Job Submission Failures
Elias Levy created FLINK-8352: - Summary: Flink UI Reports No Error on Job Submission Failures Key: FLINK-8352 URL: https://issues.apache.org/jira/browse/FLINK-8352 Project: Flink Issue Type: Bug Components: Web Client Affects Versions: 1.4.0 Reporter: Elias Levy If you submit a job jar via the web UI and it raises an exception when started, the UI will report no error and will continue the show the animated image that makes it seem as if it is working. In addition, no error is printed in the logs, unless the level is increased to at least DEBUG: {noformat} @40005a4c399202b87ebc DEBUG org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - Error while handling request. @40005a4c399202b8868c java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramInvocationException: The program caused an error: @40005a4c399202b88a74 at org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:68) @40005a4c399202b88e5c at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) @40005a4c399202b8e44c at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) @40005a4c399202b8e44c at java.util.concurrent.FutureTask.run(Unknown Source) @40005a4c399202b8e834 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) @40005a4c399202b8e834 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) @40005a4c399202b8f3ec at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) @40005a4c399202b8f7d4 at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) @40005a4c399202b8f7d4 at java.lang.Thread.run(Unknown Source) @40005a4c399202b8fbbc Caused by: org.apache.flink.client.program.ProgramInvocationException: The program caused an error: @40005a4c399202b90b5c at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93) @40005a4c399202b90f44 at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334) @40005a4c399202b90f44 at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76) @40005a4c399202b91afc at org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:57) @40005a4c399202b91afc ... 8 more @40005a4c399202b91ee4 Caused by: java.lang.ExceptionInInitializerError @40005a4c399202b91ee4 at com.cisco.sbg.amp.flink.ioc_engine.IocEngine.main(IocEngine.scala) @40005a4c399202b922cc at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) @40005a4c399202b92a9c at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) @40005a4c399202b92a9c at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) @40005a4c399202b92e84 at java.lang.reflect.Method.invoke(Unknown Source) @40005a4c399202b92e84 at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) @40005a4c399202b9326c at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) @40005a4c399202b93a3c at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) @40005a4c399202b949dc ... 11 more @40005a4c399202b949dc Caused by: java.io.FileNotFoundException: /data/jenkins/jobs/XXX/workspace/target/scala-2.11/scoverage-data/scoverage.measurements.55 (No such file or directory) @40005a4c399202b951ac at java.io.FileOutputStream.open0(Native Method) @40005a4c399202b951ac at java.io.FileOutputStream.open(Unknown Source) @40005a4c399202b9597c at java.io.FileOutputStream.(Unknown Source) @40005a4c399202b9597c at java.io.FileWriter.(Unknown Source) @40005a4c399202b95d64 at scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42) @40005a4c399202b95d64 at scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42) @40005a4c399202b9614c at scala.collection.concurrent.TrieMap.getOrElseUpdate(TrieMap.scala:901) @40005a4c399202b9614c at scoverage.Invoker$.invoked(Invoker.scala:42) @40005a4c399202b9691c at com.XXX$.(IocEngine.scala:28) @40005a4c399202b9691c at com.XXX$.(IocEngine.scala) {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8351) Canonical way to extract FoldingState QueryableStateStream
Raymond Tay created FLINK-8351: -- Summary: Canonical way to extract FoldingState QueryableStateStream Key: FLINK-8351 URL: https://issues.apache.org/jira/browse/FLINK-8351 Project: Flink Issue Type: Bug Components: Queryable State Affects Versions: 1.4.0 Environment: h3. Environment * Local flink cluster version 1.4.0 * {{classloader.resolve-order: child-first}} in {{conf/flink-conf.yaml}}. * scala 2.11.11 * oracle jdk 1.8.0 h3. Library version * akka actors 2.4.20 * akka http 10.0.10 Reporter: Raymond Tay I started experimenting with {{QueryableStateStream}} by referencing this site which is linked by Flink's website ⇒ https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html I think the example given is perfectly fine and runs w/o much issue. However, i started extrapolating the situation to have a {{FoldingState}} where the application continuously updates the state and another part of the application retrieves the state and i ran into the following situation: * Is there a canonical way or API that allows the app to extract the submitted job-id in order to query the current state ** As a work around, i'm querying the url {{localhost:8081/jobs}} but this does not work when there are other jobs running * Is there another way to query the state other than {{polling}} ? Is it possible to to return the final result as part of the {{JobExecutionResult}} ** At the moment, the app is polling the url {{localhost:8081/jobs}} and using that {{JobID}} to query the state but when the job completes, the job's state is not persisted and i lose the _final_ state value I realized {{QueryableStateStream}} is in {{beta}} as of the time of this issue so there are things i might have missed. Any help or tip is greatly appreciated ☺ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8327) ClassLoader resolution of child-first does not appear to work
[ https://issues.apache.org/jira/browse/FLINK-8327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308983#comment-16308983 ] Raymond Tay commented on FLINK-8327: Yes, the fat-jar does contain Akka classes and like yourself, i naturally assumed that the configuration file is not present but it turns out it was present (i could run it via {{run}} while in {{sbt}} just to print out that settings belonging to {{akka.http}}) and hence i came to the conclusion that it _might_ be a class-loading problem. I'll need to investigate how Akka classloading works in this situation and i'll come back a little later this week. Thanks for that suggestion, i'll try that suggestion out and see how things go and i'll come back to this issue later today. Sbt is complicated in the manner that there're multiple classloaders and i dont know the full story about how they work cohesively. > ClassLoader resolution of child-first does not appear to work > - > > Key: FLINK-8327 > URL: https://issues.apache.org/jira/browse/FLINK-8327 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 > Environment: h3. Environment > * Local flink cluster version 1.4.0 > * {{classloader.resolve-order: child-first}} in {{conf/flink-conf.yaml}}. > * scala 2.11.11 > * oracle jdk 1.8.0 > h3. Library version > * akka actors 2.4.20 > * akka http 10.0.10 >Reporter: Raymond Tay > > h2. Description > Was trying out the {{Queryable State}} and ran into a problem where the > submitted job starts regular Akka actors and making external HTTP calls via > {{akka-http}} libraries and the flink runtime was complaining that it was not > able to read the key {{akka.http}} (this key is held in the configuration > file for {{akka-http}}). > When i ran our app on the {{sbt}} shell locally, it was able to see the key > {{akka.http}} but when we submitted the fatjar (via {{sbt-assembly}}) to > flink, it was throwing the error message (see below). Looks like a class > loader issue but i'm not sure. > Any help is much appreciated ! > h2. Error message > {noformat} > Caused by: com.typesafe.config.ConfigException$Missing: No configuration > setting found for key 'akka.http' > at > com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152) > at > com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170) > at > com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189) > at > com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:258) > at > com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:264) > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:37) > at akka.http.scaladsl.Http$.createExtension(Http.scala:843) > at akka.http.scaladsl.Http$.createExtension(Http.scala:719) > at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:917) > at akka.actor.ExtensionId$class.apply(Extension.scala:79) > at akka.http.scaladsl.Http$.apply(Http.scala:838) > at akka.http.scaladsl.Http$.apply(Http.scala:719) > at org.example.state.A.(Simple.scala:158) > ... 18 more > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8322) support getting number of existing timers in TimerService
[ https://issues.apache.org/jira/browse/FLINK-8322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-8322: Fix Version/s: 1.5.0 > support getting number of existing timers in TimerService > - > > Key: FLINK-8322 > URL: https://issues.apache.org/jira/browse/FLINK-8322 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0 > > > We have the use cases where we want to use timers as scheduled threads - e.g. > add a timer to wake up x hours later and do something (reap old data usualy) > only if there's no existing timers, basically we only want at most 1 timer > exists for the key all the time -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8265) Missing jackson dependency for flink-mesos
[ https://issues.apache.org/jira/browse/FLINK-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308673#comment-16308673 ] ASF GitHub Bot commented on FLINK-8265: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5208 @aljoscha please review > Missing jackson dependency for flink-mesos > -- > > Key: FLINK-8265 > URL: https://issues.apache.org/jira/browse/FLINK-8265 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.4.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The Jackson library that is required by Fenzo is missing from the Flink > distribution jar-file. > This manifests as an exception in certain circumstances when a hard > constraint is configured ("mesos.constraints.hard.hostattribute"). > {code} > NoClassDefFoundError: > org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper > at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35) > at > com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784) > at > com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581) > at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796) > at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5208: [FLINK-8265] [Mesos] Missing jackson dependency for flink...
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5208 @aljoscha please review ---
[jira] [Commented] (FLINK-8314) Add support for Kinesis Firehose
[ https://issues.apache.org/jira/browse/FLINK-8314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308524#comment-16308524 ] Bowen Li commented on FLINK-8314: - [~aljoscha] I believe this is unnecessary. First, Firehose (FH) is just a wrapper with Kinesis inside, to better integrate with AWS eg. S3. Second, users can create a FH and set Kinesis as source, and Kinesis's data will automatically goes into FH. Based on the above two reasons, I believe there's no need to create an extra FH connector and maintain it in Flink. > Add support for Kinesis Firehose > > > Key: FLINK-8314 > URL: https://issues.apache.org/jira/browse/FLINK-8314 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Ivan Mushketyk >Priority: Minor > > [Kinesis > Firehose|https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html] > is another product in Kinesis Family that allows zero-ops scalable stream > for delivering streaming data to S3, RedShift, Elasticsearch, etc. It has a > different > [API|http://docs.aws.amazon.com/firehose/latest/APIReference/Welcome.html]. > I can contribute this myself, but I wonder if this feature makes sense to you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8305) Docker port mapping doesn't work for JobManager RPC port on Official Flink image
[ https://issues.apache.org/jira/browse/FLINK-8305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308452#comment-16308452 ] Thomas Wozniakowski commented on FLINK-8305: Hi Aljoscha, Thanks for responding! I actually ended up working this out by banging my head against it for another day or so. You're right, using 1:1 port mappings does work, but the Docker images don't allow a deterministic way to override most internal port configuration in Local Cluster mode, so I've raised a Github PR to allow this: https://github.com/docker-flink/docker-flink/pull/33 > Docker port mapping doesn't work for JobManager RPC port on Official Flink > image > > > Key: FLINK-8305 > URL: https://issues.apache.org/jira/browse/FLINK-8305 > Project: Flink > Issue Type: Bug > Components: Docker >Affects Versions: 1.4.0, 1.3.2 > Environment: Mac OSX, Docker 17.09.1-ce-mac42 (21090) >Reporter: Thomas Wozniakowski > > With the images at: https://hub.docker.com/r/_/flink/. The JobManager RPC > port does not appear to be properly exposed via a standard docker port > mapping. > Starting a local cluster with: > {{docker run --name flink_local -p 32789:6123 -t -d flink local}} > gives you: > ||CONTAINER ID||IMAGE||COMMAND||CREATED||STATUS||PORTS||NAMES|| > |e2dd5f668f4f|flink|"/docker-entrypoin..."|6 minutes ago|Up 6 > minutes|8081/tcp, 0.0.0.0:32789->6123/tcp|flink_local| > Should allow submission of a job via command line with: > {{flink run -m 0.0.0.0:32789 > /usr/local/Cellar/apache-flink/1.3.2/libexec/examples/streaming/WordCount.jar}} > But this fails with "Couldn't retrieve the JobExecutionResult from the > JobManager". > Logging directly into the container with: > {{sudo docker exec -it /bin/bash}} > allows you to successfully start and complete job with: > {{flink run -m localhost:6123 /opt/flink/examples/streaming/WordCount.jar}} > What am I missing here? If the job can be successfully submitted to 6123 > locally, and that port is mapped to an external port on 0.0.0.0, then I > should be able to submit jobs to that port? I can't find any way to make this > work though. I've tried all variants of host name (localhost, 127.0.0.1, > 0.0.0.0) and none work. > I would hugely appreciate any help. > Thanks guys. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8279) Use Mesos/YARN temp directories as fallback for BlobServer/Cache temp directories
[ https://issues.apache.org/jira/browse/FLINK-8279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308448#comment-16308448 ] ASF GitHub Bot commented on FLINK-8279: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5176 For the change regarding the JobManager use of a TaskManager configuration parameter, I created a separate issue and included the appropriate commits here (FLINK-8250) as this commit probably does not make too much sense without them. The commits contain some high-level description about the changes. > Use Mesos/YARN temp directories as fallback for BlobServer/Cache temp > directories > - > > Key: FLINK-8279 > URL: https://issues.apache.org/jira/browse/FLINK-8279 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Currently, the BLOB server and cache processes (temporarily) stash incoming > files into their local file system in the directory given by the > {{blob.storage.directory}} configuration property. If this property is not > set or empty, it will fall back to {{java.io.tmpdir}}. > Instead, in a Mesos/YARN environment, we could use the temporary directories > they assigned to the Flink job which are not only the proper folder to use, > but may also offer some more space. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5176: [FLINK-8279][blob] fall back to TaskManager temp director...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5176 For the change regarding the JobManager use of a TaskManager configuration parameter, I created a separate issue and included the appropriate commits here (FLINK-8250) as this commit probably does not make too much sense without them. The commits contain some high-level description about the changes. ---
[jira] [Created] (FLINK-8350) replace "taskmanager.tmp.dirs" with "env.io.tmp.dirs" for all components
Nico Kruber created FLINK-8350: -- Summary: replace "taskmanager.tmp.dirs" with "env.io.tmp.dirs" for all components Key: FLINK-8350 URL: https://issues.apache.org/jira/browse/FLINK-8350 Project: Flink Issue Type: Improvement Components: Cluster Management, Configuration Affects Versions: 1.4.0 Reporter: Nico Kruber Assignee: Nico Kruber Currently, there is only a {{taskmanager.tmp.dirs}} configuration parameter which (if unset) is set to YARN/Mesos' application environment paths (the latter not quite yet). With FLINK-8279, we also used this as a fall-back for the BLOB caches and would like to use it for the BLOB server as well. This, however, does not reside on the TaskManager and it only makes sense to have a single temporary directory configuration parameter (if desired, this could be extended). I propose to change this to a more generic {{env.io.tmp.dirs}} used by all components, i.e. JobManager, JobMaster, Dispatcher, and all the TaskManager-related instances for both YARN and Mesos. TODO: set this value to the appropriate folders for the JobManager code paths during cluster deployment (this exists for the TaskManager only for now) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-8334) Elasticsearch Connector throwing java.lang.ClassNotFoundException: org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer
[ https://issues.apache.org/jira/browse/FLINK-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308330#comment-16308330 ] Bhaskar Divya edited comment on FLINK-8334 at 1/2/18 4:37 PM: -- Thanks [~aljoscha]. I think I got it working now.(no more failures on start of the Job) My POM was messed up trying to fix this issue. So, I re-created a fresh POM from the maven archetype of 1.4.0. I put the dependencies I required one by one. Also, There were issues with the Elasticsearch docker environment. For anybody looking at this, Please go through [this link|https://stackoverflow.com/questions/41192680/update-max-map-count-for-elasticsearch-docker-container-mac-host#41251595] to set vm.max_map_count And, set the following environment variables in docker container as below : http.host = 0.0.0.0 transport.host = 0.0.0.0 xpack.security.enabled = false Also, The POM which is working for me is 1) Add dependency in the outer dependencies block {noformat} org.apache.flink flink-connector-elasticsearch5_${scala.binary.version} ${flink.version} {noformat} 2) Follow the guidelines from [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/linking.html]. Add the provided configuration. Inside the I added the following for elasticsearch {noformat} org.apache.flink flink-connector-elasticsearch5_${scala.binary.version} 1.4.0 jar false ${project.build.directory}/classes org/apache/flink/** org.elasticsearch elasticsearch 5.1.2 jar false ${project.build.directory}/classes ** org.elasticsearch.plugin transport-netty3-client 5.1.2 jar false ${project.build.directory}/classes ** io.netty netty 3.10.6.Final jar false ${project.build.directory}/classes ** {noformat} Few of them are definitely extras and maybe not actually required. Hope it helps anyone trying the Elasticsearch connector with Docker. was (Author: bhaskardivya): Thanks [~aljoscha]. I think I got it working now.(no more failures on start of the Job) My POM was messed up trying to fix this issue. So, I re-created a fresh POM from the maven archetype of 1.4.0. I put the dependencies I required one by one. Also, There were issues with the Elasticsearch docker environment. For anybody looking at this, Please go through [this link|https://stackoverflow.com/questions/41192680/update-max-map-count-for-elasticsearch-docker-container-mac-host#41251595] to set vm.max_map_count And, set the following environment variables in docker container as below : http.host = 0.0.0.0 transport.host = 0.0.0.0 xpack.security.enabled = false Also, The POM which is working for me is 1) Add dependency in the outer dependencies block {noformat} org.apache.flink flink-connector-elasticsearch5_${scala.binary.version} ${flink.version} {noformat} 2) Follow the guidelines from [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/linking.html]. Add the provided configuration. Inside the I added the following for elasticsearch {noformat} org.apache.flink flink-connector-elasticsearch5_${scala.binary.version} 1.4.0 jar false ${project.build.directory}/classes org/apache/flink/** org.elasticsearch elasticsearch 5.1.2 jar false ${project.build.directory}/classes ** org.elasticsearch.plugin transport-netty3-client 5.1.2 jar false ${project.build.directory}/classes ** io.netty netty 3.10.6.Final
[jira] [Commented] (FLINK-8334) Elasticsearch Connector throwing java.lang.ClassNotFoundException: org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer
[ https://issues.apache.org/jira/browse/FLINK-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308330#comment-16308330 ] Bhaskar Divya commented on FLINK-8334: -- Thanks [~aljoscha]. I think I got it working now.(no more failures on start of the Job) My POM was messed up trying to fix this issue. So, I re-created a fresh POM from the maven archetype of 1.4.0. I put the dependencies I required one by one. Also, There were issues with the Elasticsearch docker environment. For anybody looking at this, Please go through [this link|https://stackoverflow.com/questions/41192680/update-max-map-count-for-elasticsearch-docker-container-mac-host#41251595] to set vm.max_map_count And, set the following environment variables in docker container as below : http.host = 0.0.0.0 transport.host = 0.0.0.0 xpack.security.enabled = false Also, The POM which is working for me is 1) Add dependency in the outer dependencies block {noformat} org.apache.flink flink-connector-elasticsearch5_${scala.binary.version} ${flink.version} {noformat} 2) Follow the guidelines from [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/linking.html]. Add the provided configuration. Inside the I added the following for elasticsearch {noformat} org.apache.flink flink-connector-elasticsearch5_${scala.binary.version} 1.4.0 jar false ${project.build.directory}/classes org/apache/flink/** org.elasticsearch elasticsearch 5.1.2 jar false ${project.build.directory}/classes ** org.elasticsearch.plugin transport-netty3-client 5.1.2 jar false ${project.build.directory}/classes ** io.netty netty 3.10.6.Final jar false ${project.build.directory}/classes ** {noformat} Few of them are definitely extras and maybe not actually required. Hope it helps anyone trying the Elasticsearch connector with Docker. > Elasticsearch Connector throwing java.lang.ClassNotFoundException: > org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer > -- > > Key: FLINK-8334 > URL: https://issues.apache.org/jira/browse/FLINK-8334 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.4.0 > Environment: Using Elasticsearch 5.1.2 in a docker environment > Flink is deployed on a different docker >Reporter: Bhaskar Divya > Labels: elasticsearch, netty > > I have a Elasticsearch sink configured. When a job is submitted, It goes into > fail status in a few seconds. > Following is the Exception from the Job screen: > {code:java} > java.lang.RuntimeException: Elasticsearch client is not connected to any > Elasticsearch nodes! > at > org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:80) > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:281) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at >
[jira] [Commented] (FLINK-8265) Missing jackson dependency for flink-mesos
[ https://issues.apache.org/jira/browse/FLINK-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308328#comment-16308328 ] ASF GitHub Bot commented on FLINK-8265: --- Github user jaredstehler commented on the issue: https://github.com/apache/flink/pull/5208 looks good to me, thanks! > Missing jackson dependency for flink-mesos > -- > > Key: FLINK-8265 > URL: https://issues.apache.org/jira/browse/FLINK-8265 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.4.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The Jackson library that is required by Fenzo is missing from the Flink > distribution jar-file. > This manifests as an exception in certain circumstances when a hard > constraint is configured ("mesos.constraints.hard.hostattribute"). > {code} > NoClassDefFoundError: > org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper > at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35) > at > com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784) > at > com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581) > at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796) > at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5208: [FLINK-8265] [Mesos] Missing jackson dependency for flink...
Github user jaredstehler commented on the issue: https://github.com/apache/flink/pull/5208 looks good to me, thanks! ---
[jira] [Commented] (FLINK-6206) As an Engineer, I want task state transition log to be warn/error for FAILURE scenarios
[ https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308303#comment-16308303 ] Dan Bress commented on FLINK-6206: -- This is definitely relevant in 1.2.0, without this is very hard to detect situations when the system restarts due to an exception happening. > As an Engineer, I want task state transition log to be warn/error for FAILURE > scenarios > --- > > Key: FLINK-6206 > URL: https://issues.apache.org/jira/browse/FLINK-6206 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Dan Bress >Priority: Critical > > If a task fails due to an exception, I would like that to be logged at a warn > or an error level. currently its info > {code} > private boolean transitionState(ExecutionState currentState, ExecutionState > newState, Throwable cause) { > if (STATE_UPDATER.compareAndSet(this, currentState, newState)) { > if (cause == null) { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState); > } else { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState, cause); > } > return true; > } else { > return false; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-8272. --- Resolution: Not A Problem Fix Version/s: (was: 1.4.0) > 1.4 & Scala: Exception when querying the state > -- > > Key: FLINK-8272 > URL: https://issues.apache.org/jira/browse/FLINK-8272 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 > Environment: mac >Reporter: Juan Miguel Cejuela > > With Flink 1.3.2 and basically the same code except for the changes in the > API that happened in 1.4.0, I could query the state finely. > With 1.4.0 (& scala), I get the exception written below. Most important line: > {{java.util.concurrent.CompletionException: > java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds > writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} > Somehow it seems that the connection to the underlying buffer fails. Perhaps > most informing is that, if I give a wrong {{JobId}} (i.e. one that is not > running), I still get the same error. So I guess no connection succeeds. > Also, to avoid possible serialization errors, I am just querying with a key > of type {{String}} and, for testing, a dummy & trivial {{case class}}. > I create the necessary types such as: > {code} > implicit val typeInformationString = createTypeInformation[String] > implicit val typeInformationDummy = createTypeInformation[Dummy] > implicit val valueStateDescriptorDummy = > new ValueStateDescriptor("state", typeInformationDummy)}} > {code} > And then query the like such as: > {code} > val javaCompletableFuture = client > .getKvState(this.flinkJobIdObject, this.queryableState, key, > typeInformationString, valueStateDescriptorDummy) > {code} > Does anybody have any clue on this? Let me know what other information I > should provide. > --- > Full stack trace: > {code} > java.util.concurrent.CompletionException: > java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds > writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) > at > org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) > at > org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at >
[jira] [Reopened] (FLINK-8272) 1.4 & Scala: Exception when querying the state
[ https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reopened FLINK-8272: - Reopen to change resolution. > 1.4 & Scala: Exception when querying the state > -- > > Key: FLINK-8272 > URL: https://issues.apache.org/jira/browse/FLINK-8272 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 > Environment: mac >Reporter: Juan Miguel Cejuela > Fix For: 1.4.0 > > > With Flink 1.3.2 and basically the same code except for the changes in the > API that happened in 1.4.0, I could query the state finely. > With 1.4.0 (& scala), I get the exception written below. Most important line: > {{java.util.concurrent.CompletionException: > java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds > writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}} > Somehow it seems that the connection to the underlying buffer fails. Perhaps > most informing is that, if I give a wrong {{JobId}} (i.e. one that is not > running), I still get the same error. So I guess no connection succeeds. > Also, to avoid possible serialization errors, I am just querying with a key > of type {{String}} and, for testing, a dummy & trivial {{case class}}. > I create the necessary types such as: > {code} > implicit val typeInformationString = createTypeInformation[String] > implicit val typeInformationDummy = createTypeInformation[Dummy] > implicit val valueStateDescriptorDummy = > new ValueStateDescriptor("state", typeInformationDummy)}} > {code} > And then query the like such as: > {code} > val javaCompletableFuture = client > .getKvState(this.flinkJobIdObject, this.queryableState, key, > typeInformationString, valueStateDescriptorDummy) > {code} > Does anybody have any clue on this? Let me know what other information I > should provide. > --- > Full stack trace: > {code} > java.util.concurrent.CompletionException: > java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds > writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0) > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432) > at > org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505) > at > org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at >
[jira] [Commented] (FLINK-8282) Transformation with TwoInputStreamOperator fails
[ https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308293#comment-16308293 ] Aljoscha Krettek commented on FLINK-8282: - In my opinion, there is no good separation of concerns between the {{StreamTask}} and the {{StreamOperator}}. {{AbstractStreamOperator}} does a lot of things that a stream operator shouldn't do and does things in a very specific way that {{StreamTask}} and other components expect to be done in this way and things go haywire if the operator doesn't behave that way. Off the top of my head this includes everything that happens in {{setup()}}, i.e. metrics setup and configuration stuff, everything that happens in the various state initialisation methods and snapshot/restore methods, and the latency marker stuff. There is actually this (somewhat old) issue: FLINK-4859. > Transformation with TwoInputStreamOperator fails > > > Key: FLINK-8282 > URL: https://issues.apache.org/jira/browse/FLINK-8282 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Timo Walther > > The following program fails because of multiple reasons (see exceptions > below). The transformation with a {{TwoInputStreamOperator}} does not extend > {{AbstractStreamOperator}}. I think this is the main cause why it fails. > Either we fix the exceptions or we check for {{AbstractStreamOperator}} first. > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream ds1 = env.addSource(new > SourceFunction() { > @Override > public void run(SourceContext ctx) throws > Exception { > ctx.emitWatermark(new Watermark(100L)); > ctx.collect(12); > while (true) Thread.yield(); > } > @Override > public void cancel() { > } > }); > DataStream ds2 = env.addSource(new > SourceFunction() { > @Override > public void run(SourceContext ctx) throws > Exception { > ctx.emitWatermark(new Watermark(200L)); > ctx.collect(12); > while (true) Thread.yield(); > } > @Override > public void cancel() { > } > }); > ds1.connect(ds2.broadcast()).transform("test", Types.INT, new > TwoInputStreamOperator() { > @Override > public void processElement1(StreamRecord > element) throws Exception { > System.out.println(); > } > @Override > public void processElement2(StreamRecord > element) throws Exception { > System.out.println(); > } > @Override > public void processWatermark1(Watermark mark) throws > Exception { > System.out.println(); > } > @Override > public void processWatermark2(Watermark mark) throws > Exception { > System.out.println(); > } > @Override > public void processLatencyMarker1(LatencyMarker > latencyMarker) throws Exception { > } > @Override > public void processLatencyMarker2(LatencyMarker > latencyMarker) throws Exception { > } > @Override > public void setup(StreamTask containingTask, > StreamConfig config, Output output) { > } > @Override > public void open() throws Exception { > } > @Override > public void close() throws Exception { > } > @Override > public void dispose() throws Exception { > } > @Override > public OperatorSnapshotResult snapshotState(long > checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws > Exception { > return null; > } >
[jira] [Commented] (FLINK-8284) Custom metrics not being exposed for Prometheus
[ https://issues.apache.org/jira/browse/FLINK-8284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308280#comment-16308280 ] Aljoscha Krettek commented on FLINK-8284: - [~Zentol] Could you please have a look at this? > Custom metrics not being exposed for Prometheus > --- > > Key: FLINK-8284 > URL: https://issues.apache.org/jira/browse/FLINK-8284 > Project: Flink > Issue Type: Bug > Components: Documentation, Metrics >Affects Versions: 1.4.0 > Environment: Linux/CentOS 7 >Reporter: Julio Biason > > Following the documentation, we changed our filter that removes events with > missing fields to a RichFilterFunction, so we can capture metrics about such > events: > {code:scala} > public class MissingClientFilter extends RichFilterFunction { > private transient Counter counter; > @Override > public void open(Configuration config) { > this.counter = getRuntimeContext() > .getMetricGroup() > .addGroup("events") > .counter("missingClient"); > } > @Override > public boolean filter(LineData line) { > String client = line.get("client").toString(); > boolean missing = client.trim().equals(""); > if (!missing) { > this.count(); > } > return !missing; > } > private void count() { > if (this.counter != null) { > this.counter.inc(); > } > } > } > {code} > We also added Prometheus as our reporter: > {noformat} > metrics.reporters: prom > metrics.reporter.prom.port: 9105 > metrics.reporter.prom.class: > org.apache.flink.metrics.prometheus.PrometheusReporter > {noformat} > The problem is accessing port 9105 display all Flink metrics, but not ours. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8305) Docker port mapping doesn't work for JobManager RPC port on Official Flink image
[ https://issues.apache.org/jira/browse/FLINK-8305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308269#comment-16308269 ] Aljoscha Krettek commented on FLINK-8305: - This is a known issue based on a "feature" of Akka: Akka connections only work if they client connects to the server with exactly the same address that the server "thinks" it has. I think you can work around this by adding {{EXPOSE 6123}} to your Dockerfile and using exactly that port and from an environment that can use the hostname of the JobManager. > Docker port mapping doesn't work for JobManager RPC port on Official Flink > image > > > Key: FLINK-8305 > URL: https://issues.apache.org/jira/browse/FLINK-8305 > Project: Flink > Issue Type: Bug > Components: Docker >Affects Versions: 1.4.0, 1.3.2 > Environment: Mac OSX, Docker 17.09.1-ce-mac42 (21090) >Reporter: Thomas Wozniakowski > > With the images at: https://hub.docker.com/r/_/flink/. The JobManager RPC > port does not appear to be properly exposed via a standard docker port > mapping. > Starting a local cluster with: > {{docker run --name flink_local -p 32789:6123 -t -d flink local}} > gives you: > ||CONTAINER ID||IMAGE||COMMAND||CREATED||STATUS||PORTS||NAMES|| > |e2dd5f668f4f|flink|"/docker-entrypoin..."|6 minutes ago|Up 6 > minutes|8081/tcp, 0.0.0.0:32789->6123/tcp|flink_local| > Should allow submission of a job via command line with: > {{flink run -m 0.0.0.0:32789 > /usr/local/Cellar/apache-flink/1.3.2/libexec/examples/streaming/WordCount.jar}} > But this fails with "Couldn't retrieve the JobExecutionResult from the > JobManager". > Logging directly into the container with: > {{sudo docker exec -it /bin/bash}} > allows you to successfully start and complete job with: > {{flink run -m localhost:6123 /opt/flink/examples/streaming/WordCount.jar}} > What am I missing here? If the job can be successfully submitted to 6123 > locally, and that port is mapped to an external port on 0.0.0.0, then I > should be able to submit jobs to that port? I can't find any way to make this > work though. I've tried all variants of host name (localhost, 127.0.0.1, > 0.0.0.0) and none work. > I would hugely appreciate any help. > Thanks guys. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8332) Move dispose savepoint into ClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308243#comment-16308243 ] ASF GitHub Bot commented on FLINK-8332: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r159249258 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -659,128 +655,107 @@ protected int savepoint(String[] args) { return 0; } - if (options.isDispose()) { - // Discard - return disposeSavepoint(options); - } else { - // Trigger - String[] cleanedArgs = options.getArgs(); - JobID jobId; + CustomCommandLine customCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (cleanedArgs.length >= 1) { - String jobIdString = cleanedArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } catch (Exception e) { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID.")); - } + ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + + try { + if (options.isDispose()) { + // Discard + return disposeSavepoint(clusterClient, options.getSavepointPath()); } else { - return handleArgException(new IllegalArgumentException( + // Trigger + String[] cleanedArgs = options.getArgs(); + JobID jobId; + + if (cleanedArgs.length >= 1) { + String jobIdString = cleanedArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); --- End diff -- nit: `JobID.fromHexString(jobIdString)` > Move dispose savepoint into ClusterClient > - > > Key: FLINK-8332 > URL: https://issues.apache.org/jira/browse/FLINK-8332 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the {{CliFrontend}} sends the command for disposing a savepoint. > In order to better abstract this functionality we should move it to the > {{ClusterClient}}. That way we can have different implementations of the > {{ClusterClient}} (Flip-6 and old code) which are used by the same > {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r159249258 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -659,128 +655,107 @@ protected int savepoint(String[] args) { return 0; } - if (options.isDispose()) { - // Discard - return disposeSavepoint(options); - } else { - // Trigger - String[] cleanedArgs = options.getArgs(); - JobID jobId; + CustomCommandLine customCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (cleanedArgs.length >= 1) { - String jobIdString = cleanedArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } catch (Exception e) { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID.")); - } + ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + + try { + if (options.isDispose()) { + // Discard + return disposeSavepoint(clusterClient, options.getSavepointPath()); } else { - return handleArgException(new IllegalArgumentException( + // Trigger + String[] cleanedArgs = options.getArgs(); + JobID jobId; + + if (cleanedArgs.length >= 1) { + String jobIdString = cleanedArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); --- End diff -- nit: `JobID.fromHexString(jobIdString)` ---
[jira] [Commented] (FLINK-8332) Move dispose savepoint into ClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308233#comment-16308233 ] ASF GitHub Bot commented on FLINK-8332: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r159248959 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -659,128 +655,107 @@ protected int savepoint(String[] args) { return 0; } - if (options.isDispose()) { - // Discard - return disposeSavepoint(options); - } else { - // Trigger - String[] cleanedArgs = options.getArgs(); - JobID jobId; + CustomCommandLine customCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (cleanedArgs.length >= 1) { - String jobIdString = cleanedArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } catch (Exception e) { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID.")); - } + ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + + try { + if (options.isDispose()) { + // Discard + return disposeSavepoint(clusterClient, options.getSavepointPath()); } else { - return handleArgException(new IllegalArgumentException( + // Trigger + String[] cleanedArgs = options.getArgs(); + JobID jobId; + + if (cleanedArgs.length >= 1) { + String jobIdString = cleanedArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); --- End diff -- `JobID.fromHexString` > Move dispose savepoint into ClusterClient > - > > Key: FLINK-8332 > URL: https://issues.apache.org/jira/browse/FLINK-8332 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the {{CliFrontend}} sends the command for disposing a savepoint. > In order to better abstract this functionality we should move it to the > {{ClusterClient}}. That way we can have different implementations of the > {{ClusterClient}} (Flip-6 and old code) which are used by the same > {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5219#discussion_r159248959 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -659,128 +655,107 @@ protected int savepoint(String[] args) { return 0; } - if (options.isDispose()) { - // Discard - return disposeSavepoint(options); - } else { - // Trigger - String[] cleanedArgs = options.getArgs(); - JobID jobId; + CustomCommandLine customCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (cleanedArgs.length >= 1) { - String jobIdString = cleanedArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } catch (Exception e) { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID.")); - } + ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + + try { + if (options.isDispose()) { + // Discard + return disposeSavepoint(clusterClient, options.getSavepointPath()); } else { - return handleArgException(new IllegalArgumentException( + // Trigger + String[] cleanedArgs = options.getArgs(); + JobID jobId; + + if (cleanedArgs.length >= 1) { + String jobIdString = cleanedArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); --- End diff -- `JobID.fromHexString` ---
[jira] [Commented] (FLINK-8314) Add support for Kinesis Firehose
[ https://issues.apache.org/jira/browse/FLINK-8314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308230#comment-16308230 ] Aljoscha Krettek commented on FLINK-8314: - So this would essentially be another connector that is not exactly like Kinesis but in the same ecosystem? [~tzulitai] and [~phoenixjiangnan], what do you think? > Add support for Kinesis Firehose > > > Key: FLINK-8314 > URL: https://issues.apache.org/jira/browse/FLINK-8314 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Ivan Mushketyk >Priority: Minor > > [Kinesis > Firehose|https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html] > is another product in Kinesis Family that allows zero-ops scalable stream > for delivering streaming data to S3, RedShift, Elasticsearch, etc. It has a > different > [API|http://docs.aws.amazon.com/firehose/latest/APIReference/Welcome.html]. > I can contribute this myself, but I wonder if this feature makes sense to you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8315) Use AWSCredentialsProvider to get AWS credentials
[ https://issues.apache.org/jira/browse/FLINK-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308225#comment-16308225 ] Aljoscha Krettek commented on FLINK-8315: - I think this makes sense, but I'm not too familiar with the Kinesis connector. [~tzulitai] and [~phoenixjiangnan], what do you think about this? > Use AWSCredentialsProvider to get AWS credentials > - > > Key: FLINK-8315 > URL: https://issues.apache.org/jira/browse/FLINK-8315 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Ivan Mushketyk >Priority: Minor > > Instead of providing credentials like: > {code:java} > Properties consumerConfig = new Properties(); > consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1"); > consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "..."); > consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, > "..."); > consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > "TRIM_HORIZON"); > {code} > Kinesis connector could use the AWSCredentialsProvider interface. > I can contribute it myself, but I wonder if this feature makes sense to you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5192: [FLINK-8257] [conf] Unify the value checks for set...
Github user xccui closed the pull request at: https://github.com/apache/flink/pull/5192 ---
[jira] [Commented] (FLINK-8257) Unify the value checks for setParallelism()
[ https://issues.apache.org/jira/browse/FLINK-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308219#comment-16308219 ] ASF GitHub Bot commented on FLINK-8257: --- Github user xccui closed the pull request at: https://github.com/apache/flink/pull/5192 > Unify the value checks for setParallelism() > --- > > Key: FLINK-8257 > URL: https://issues.apache.org/jira/browse/FLINK-8257 > Project: Flink > Issue Type: Improvement > Components: Configuration >Reporter: Xingcan Cui >Assignee: Xingcan Cui > Fix For: 1.5.0 > > > The {{setParallelism()}} method exist in many components from different > levels. Some of the methods require the input value to be greater than {{1}} > (e.g., {{StreamTransformation.setParallelism()}}), while some of them also > allow the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is > {{-1}} by default (e.g., {{DataSink.setParallelism()}}). We need to unify the > value checks for these methods. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308217#comment-16308217 ] Aljoscha Krettek edited comment on FLINK-8318 at 1/2/18 3:28 PM: - Do you have a specific reason for using {{parent-first}}? I think you case should work if you include ES and Jackson in your user jar and use {{child-first}}, which was introduced for exactly such cases of dependency clashes. was (Author: aljoscha): Do you have a specific reason for using `parent-first`. I think you case should work if you include ES and Jackson in your user jar and use `child-first`, which was introduced for exactly such cases of dependency clashes. > Conflict jackson library with ElasticSearch connector > - > > Key: FLINK-8318 > URL: https://issues.apache.org/jira/browse/FLINK-8318 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector, Startup Shell Scripts >Affects Versions: 1.4.0 >Reporter: Jihyun Cho > > My flink job is failed after update flink version to 1.4.0. It uses > ElasticSearch connector. > I'm using CDH Hadoop with Flink option "classloader.resolve-order: > parent-first" > The failure log is below. > {noformat} > Using the result of 'hadoop classpath' to augment the Hadoop classpath: > /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//* > 2017-12-26 14:13:21,160 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - OS current > user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Current > Hadoop/Kerberos user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM: Java > HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11 > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap > size: 31403 MiBytes > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: > (not set) > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Hadoop > version: 2.6.5 > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xms32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xmx32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:MaxDirectMemorySize=8388607T > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/ > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Program > Arguments: > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > --configDir > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > /home/www/service/flink-1.4.0/conf > 2017-12-26 14:13:21,449 INFO >
[jira] [Commented] (FLINK-8257) Unify the value checks for setParallelism()
[ https://issues.apache.org/jira/browse/FLINK-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308218#comment-16308218 ] ASF GitHub Bot commented on FLINK-8257: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5192 Thanks for the review, @aljoscha . I'll close this PR. > Unify the value checks for setParallelism() > --- > > Key: FLINK-8257 > URL: https://issues.apache.org/jira/browse/FLINK-8257 > Project: Flink > Issue Type: Improvement > Components: Configuration >Reporter: Xingcan Cui >Assignee: Xingcan Cui > Fix For: 1.5.0 > > > The {{setParallelism()}} method exist in many components from different > levels. Some of the methods require the input value to be greater than {{1}} > (e.g., {{StreamTransformation.setParallelism()}}), while some of them also > allow the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is > {{-1}} by default (e.g., {{DataSink.setParallelism()}}). We need to unify the > value checks for these methods. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308217#comment-16308217 ] Aljoscha Krettek commented on FLINK-8318: - Do you have a specific reason for using `parent-first`. I think you case should work if you include ES and Jackson in your user jar and use `child-first`, which was introduced for exactly such cases of dependency clashes. > Conflict jackson library with ElasticSearch connector > - > > Key: FLINK-8318 > URL: https://issues.apache.org/jira/browse/FLINK-8318 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector, Startup Shell Scripts >Affects Versions: 1.4.0 >Reporter: Jihyun Cho > > My flink job is failed after update flink version to 1.4.0. It uses > ElasticSearch connector. > I'm using CDH Hadoop with Flink option "classloader.resolve-order: > parent-first" > The failure log is below. > {noformat} > Using the result of 'hadoop classpath' to augment the Hadoop classpath: > /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//* > 2017-12-26 14:13:21,160 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - OS current > user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Current > Hadoop/Kerberos user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM: Java > HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11 > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap > size: 31403 MiBytes > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: > (not set) > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Hadoop > version: 2.6.5 > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xms32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xmx32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:MaxDirectMemorySize=8388607T > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/ > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Program > Arguments: > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > --configDir > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > /home/www/service/flink-1.4.0/conf > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Classpath: > ...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:... > > 2017-12-26 14:14:01,393 INFO org.apache.flink.runtime.taskmanager.Task >
[GitHub] flink issue #5192: [FLINK-8257] [conf] Unify the value checks for setParalle...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5192 Thanks for the review, @aljoscha ð . I'll close this PR. ---
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308212#comment-16308212 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r159247245 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java --- @@ -27,11 +27,16 @@ import java.util.concurrent.TimeoutException; /** - * Utility functions for Flink's RPC implementation + * Utility functions for Flink's RPC implementation. */ public class RpcUtils { - public static final Time INF_TIMEOUT = Time.milliseconds(Long.MAX_VALUE); + /** +* HACK: Set to 21474835 seconds, Akka's maximum delay (Akka 2.4.20). The value cannot be +* higher or an {@link IllegalArgumentException} will be thrown during an RPC. Check the private +* method {@code checkMaxDelay()} in {@link akka.actor.LightArrayRevolverScheduler}. +*/ + public static final Time INF_TIMEOUT = Time.milliseconds(TimeUnit.SECONDS.toMillis(21474835)); --- End diff -- `Time.seconds(21474835)` > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to interface > * Implement method in {{Dispatcher}} and {{JobMaster}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5223: [FLINK-8317][flip6] Implement Triggering of Savepo...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r159247245 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java --- @@ -27,11 +27,16 @@ import java.util.concurrent.TimeoutException; /** - * Utility functions for Flink's RPC implementation + * Utility functions for Flink's RPC implementation. */ public class RpcUtils { - public static final Time INF_TIMEOUT = Time.milliseconds(Long.MAX_VALUE); + /** +* HACK: Set to 21474835 seconds, Akka's maximum delay (Akka 2.4.20). The value cannot be +* higher or an {@link IllegalArgumentException} will be thrown during an RPC. Check the private +* method {@code checkMaxDelay()} in {@link akka.actor.LightArrayRevolverScheduler}. +*/ + public static final Time INF_TIMEOUT = Time.milliseconds(TimeUnit.SECONDS.toMillis(21474835)); --- End diff -- `Time.seconds(21474835)` ---
[jira] [Commented] (FLINK-8322) support getting number of existing timers in TimerService
[ https://issues.apache.org/jira/browse/FLINK-8322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308201#comment-16308201 ] Aljoscha Krettek commented on FLINK-8322: - Yes, I think one (maybe the best) solution is to keep an extra bit of state for keeping track of the number of timers. {{num*Timers()}} will probably get more expensive once we store timers not only on the heap but for example in RocksDB. (See FLINK-5544) > support getting number of existing timers in TimerService > - > > Key: FLINK-8322 > URL: https://issues.apache.org/jira/browse/FLINK-8322 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li > > We have the use cases where we want to use timers as scheduled threads - e.g. > add a timer to wake up x hours later and do something (reap old data usualy) > only if there's no existing timers, basically we only want at most 1 timer > exists for the key all the time -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8326) CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't use the correct parameter to trigger test function
[ https://issues.apache.org/jira/browse/FLINK-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308198#comment-16308198 ] ASF GitHub Bot commented on FLINK-8326: --- Github user tony810430 closed the pull request at: https://github.com/apache/flink/pull/5211 > CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't > use the correct parameter to trigger test function > > > Key: FLINK-8326 > URL: https://issues.apache.org/jira/browse/FLINK-8326 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Minor > Fix For: 1.5.0 > > > {{CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut()}} > didn't use the correct parameter, so that it only test scale in. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8332) Move dispose savepoint into ClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308196#comment-16308196 ] ASF GitHub Bot commented on FLINK-8332: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5219 This PR is based on #5216 > Move dispose savepoint into ClusterClient > - > > Key: FLINK-8332 > URL: https://issues.apache.org/jira/browse/FLINK-8332 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the {{CliFrontend}} sends the command for disposing a savepoint. > In order to better abstract this functionality we should move it to the > {{ClusterClient}}. That way we can have different implementations of the > {{ClusterClient}} (Flip-6 and old code) which are used by the same > {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5211: [FLINK-8326] [checkpoints] CheckpointCoordinatorTe...
Github user tony810430 closed the pull request at: https://github.com/apache/flink/pull/5211 ---
[GitHub] flink issue #5219: [FLINK-8332] [flip6] Move savepoint dispose into ClusterC...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5219 This PR is based on #5216 ---
[jira] [Commented] (FLINK-8326) CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't use the correct parameter to trigger test function
[ https://issues.apache.org/jira/browse/FLINK-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308191#comment-16308191 ] ASF GitHub Bot commented on FLINK-8326: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5211 Thanks a lot for catching this! I merged, could you please close this PR? > CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't > use the correct parameter to trigger test function > > > Key: FLINK-8326 > URL: https://issues.apache.org/jira/browse/FLINK-8326 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Minor > Fix For: 1.5.0 > > > {{CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut()}} > didn't use the correct parameter, so that it only test scale in. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8326) CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't use the correct parameter to trigger test function
[ https://issues.apache.org/jira/browse/FLINK-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-8326. --- Resolution: Fixed Fix Version/s: 1.5.0 Fixed on master in e20536586aa21ad7cfdb805b259827b2a15e3c92 > CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't > use the correct parameter to trigger test function > > > Key: FLINK-8326 > URL: https://issues.apache.org/jira/browse/FLINK-8326 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Minor > Fix For: 1.5.0 > > > {{CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut()}} > didn't use the correct parameter, so that it only test scale in. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5211: [FLINK-8326] [checkpoints] CheckpointCoordinatorTest#test...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5211 Thanks a lot for catching this! ð I merged, could you please close this PR? ---
[jira] [Commented] (FLINK-8329) Move YarnClient out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308185#comment-16308185 ] ASF GitHub Bot commented on FLINK-8329: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5216#discussion_r159243765 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -143,31 +138,48 @@ private YarnConfigOptions.UserJarInclusion userJarInclusion; public AbstractYarnClusterDescriptor( - org.apache.flink.configuration.Configuration flinkConfiguration, - String configurationDirectory) { + Configuration flinkConfiguration, + String configurationDirectory, + YarnClient yarnClient) { + + yarnConfiguration = new YarnConfiguration(); + // for unit tests only if (System.getenv("IN_TESTS") != null) { try { - conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL()); + yarnConfiguration.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL()); --- End diff -- I think it's safer to use `new File(System.getenv("YARN_CONF_DIR"), "yarn-site.xml")` > Move YarnClient out of YarnClusterClient > > > Key: FLINK-8329 > URL: https://issues.apache.org/jira/browse/FLINK-8329 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Move the {{YarnClient}} from the {{YarnClusterClient}} to the > {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle > management of the {{YarnClient}}. This change is a clean up task which will > better structure the client code. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8329) Move YarnClient out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308184#comment-16308184 ] ASF GitHub Bot commented on FLINK-8329: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5216#discussion_r159242946 --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java --- @@ -78,6 +83,9 @@ public void testPerJobMode() { jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI())); YarnClusterClient clusterClient = yarnClusterDescriptorV2.deployJobCluster(clusterSpecification, jobGraph); + + clusterClient.shutdown(); + yarnClusterDescriptorV2.close(); --- End diff -- Closing in finally or try-with-resource not needed? > Move YarnClient out of YarnClusterClient > > > Key: FLINK-8329 > URL: https://issues.apache.org/jira/browse/FLINK-8329 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Move the {{YarnClient}} from the {{YarnClusterClient}} to the > {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle > management of the {{YarnClient}}. This change is a clean up task which will > better structure the client code. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8329) Move YarnClient out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308186#comment-16308186 ] ASF GitHub Bot commented on FLINK-8329: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5216#discussion_r159243077 --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java --- @@ -272,6 +275,7 @@ public void testJavaAPI() throws Exception { LOG.info("Shutting down cluster. All tests passed"); // shutdown cluster yarnCluster.shutdown(); + clusterDescriptor.close(); --- End diff -- Closing in finally or try-with-resource not needed? > Move YarnClient out of YarnClusterClient > > > Key: FLINK-8329 > URL: https://issues.apache.org/jira/browse/FLINK-8329 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Move the {{YarnClient}} from the {{YarnClusterClient}} to the > {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle > management of the {{YarnClient}}. This change is a clean up task which will > better structure the client code. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5216: [FLINK-8329] [flip6] Move YarnClient to AbstractYa...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5216#discussion_r159243077 --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java --- @@ -272,6 +275,7 @@ public void testJavaAPI() throws Exception { LOG.info("Shutting down cluster. All tests passed"); // shutdown cluster yarnCluster.shutdown(); + clusterDescriptor.close(); --- End diff -- Closing in finally or try-with-resource not needed? ---
[GitHub] flink pull request #5216: [FLINK-8329] [flip6] Move YarnClient to AbstractYa...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5216#discussion_r159243765 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -143,31 +138,48 @@ private YarnConfigOptions.UserJarInclusion userJarInclusion; public AbstractYarnClusterDescriptor( - org.apache.flink.configuration.Configuration flinkConfiguration, - String configurationDirectory) { + Configuration flinkConfiguration, + String configurationDirectory, + YarnClient yarnClient) { + + yarnConfiguration = new YarnConfiguration(); + // for unit tests only if (System.getenv("IN_TESTS") != null) { try { - conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL()); + yarnConfiguration.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL()); --- End diff -- I think it's safer to use `new File(System.getenv("YARN_CONF_DIR"), "yarn-site.xml")` ---
[GitHub] flink pull request #5216: [FLINK-8329] [flip6] Move YarnClient to AbstractYa...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5216#discussion_r159242946 --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java --- @@ -78,6 +83,9 @@ public void testPerJobMode() { jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI())); YarnClusterClient clusterClient = yarnClusterDescriptorV2.deployJobCluster(clusterSpecification, jobGraph); + + clusterClient.shutdown(); + yarnClusterDescriptorV2.close(); --- End diff -- Closing in finally or try-with-resource not needed? ---
[jira] [Commented] (FLINK-8327) ClassLoader resolution of child-first does not appear to work
[ https://issues.apache.org/jira/browse/FLINK-8327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308180#comment-16308180 ] Aljoscha Krettek commented on FLINK-8327: - So the fat-jar contains the Akka classes? I think the problem is that the configuration file is not included in the fat-jar. For Akka these files should be called {{reference.conf}} and I think you can include them in your fat-jar via this (Maven) configuration: {code} reference.conf {code} Unfortunately, I don't know how to do that using sbt but you can try manually adding the {{reference.conf}} file from the {{akka-http}} jar to your jar by unzipping and re-zipping it with that file. > ClassLoader resolution of child-first does not appear to work > - > > Key: FLINK-8327 > URL: https://issues.apache.org/jira/browse/FLINK-8327 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 > Environment: h3. Environment > * Local flink cluster version 1.4.0 > * {{classloader.resolve-order: child-first}} in {{conf/flink-conf.yaml}}. > * scala 2.11.11 > * oracle jdk 1.8.0 > h3. Library version > * akka actors 2.4.20 > * akka http 10.0.10 >Reporter: Raymond Tay > > h2. Description > Was trying out the {{Queryable State}} and ran into a problem where the > submitted job starts regular Akka actors and making external HTTP calls via > {{akka-http}} libraries and the flink runtime was complaining that it was not > able to read the key {{akka.http}} (this key is held in the configuration > file for {{akka-http}}). > When i ran our app on the {{sbt}} shell locally, it was able to see the key > {{akka.http}} but when we submitted the fatjar (via {{sbt-assembly}}) to > flink, it was throwing the error message (see below). Looks like a class > loader issue but i'm not sure. > Any help is much appreciated ! > h2. Error message > {noformat} > Caused by: com.typesafe.config.ConfigException$Missing: No configuration > setting found for key 'akka.http' > at > com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152) > at > com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170) > at > com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189) > at > com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:258) > at > com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:264) > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:37) > at akka.http.scaladsl.Http$.createExtension(Http.scala:843) > at akka.http.scaladsl.Http$.createExtension(Http.scala:719) > at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:917) > at akka.actor.ExtensionId$class.apply(Extension.scala:79) > at akka.http.scaladsl.Http$.apply(Http.scala:838) > at akka.http.scaladsl.Http$.apply(Http.scala:719) > at org.example.state.A.(Simple.scala:158) > ... 18 more > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8334) Elasticsearch Connector throwing java.lang.ClassNotFoundException: org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer
[ https://issues.apache.org/jira/browse/FLINK-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308168#comment-16308168 ] Aljoscha Krettek commented on FLINK-8334: - Could you check if your jar files contain any netty classes? This could be a clash in dependencies or Maven might be excluding stuff since it "thinks" it comes provided with Flink. > Elasticsearch Connector throwing java.lang.ClassNotFoundException: > org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer > -- > > Key: FLINK-8334 > URL: https://issues.apache.org/jira/browse/FLINK-8334 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.4.0 > Environment: Using Elasticsearch 5.1.2 in a docker environment > Flink is deployed on a different docker >Reporter: Bhaskar Divya > Labels: elasticsearch, netty > > I have a Elasticsearch sink configured. When a job is submitted, It goes into > fail status in a few seconds. > Following is the Exception from the Job screen: > {code:java} > java.lang.RuntimeException: Elasticsearch client is not connected to any > Elasticsearch nodes! > at > org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:80) > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:281) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > {code} > In the logs, Following stack trace is shown. > {code} > 2018-01-01 12:15:14,432 INFO > org.elasticsearch.client.transport.TransportClientNodesService - failed to > get node info for > {#transport#-1}{8IZTMPcSRCyKRynhfyN2fA}{192.168.99.100}{192.168.99.100:9300}, > disconnecting... > NodeDisconnectedException[[][192.168.99.100:9300][cluster:monitor/nodes/liveness] > disconnected] > 2018-01-01 12:15:19,433 ERROR org.elasticsearch.transport.netty3.Netty3Utils > - fatal error on the network layer > at > org.elasticsearch.transport.netty3.Netty3Utils.maybeDie(Netty3Utils.java:195) > at > org.elasticsearch.transport.netty3.Netty3MessageChannelHandler.exceptionCaught(Netty3MessageChannelHandler.java:82) > at > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) > at > org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377) > at > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) > at > org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:291) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:151) > at > org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:292) > at > org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:391) > at > org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:315) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) > at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) > at > org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) > at > org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >
[jira] [Created] (FLINK-8349) Remove Yarn specific commands from YarnClusterClient
Till Rohrmann created FLINK-8349: Summary: Remove Yarn specific commands from YarnClusterClient Key: FLINK-8349 URL: https://issues.apache.org/jira/browse/FLINK-8349 Project: Flink Issue Type: Sub-task Components: Client Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 The {{YarnClusterClient}} should no longer use Yarn specific commands. This is necessary to make the {{FlinkYarnSessionCli}} work with other {{ClusterClient}} implementations than the {{YarnClusterClient}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159238089 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -98,7 +99,13 @@ // late arriving event OutputTagprivate static final OutputTag > lateOutputTag = new OutputTag >("late-output") {}; - private void testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness , Tuple2 > testHarness) throws Exception { + private void testSlidingEventTimeWindows(OneInputStreamOperator , Tuple2 > operator) throws Exception { --- End diff -- Yes. Previously restoring tests were working on `closed` harness, which was working only because test harness it self wasn't properly closing itself ð ---
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308151#comment-16308151 ] ASF GitHub Bot commented on FLINK-8268: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159238089 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -98,7 +99,13 @@ // late arriving event OutputTagprivate static final OutputTag > lateOutputTag = new OutputTag >("late-output") {}; - private void testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness , Tuple2 > testHarness) throws Exception { + private void testSlidingEventTimeWindows(OneInputStreamOperator , Tuple2 > operator) throws Exception { --- End diff -- Yes. Previously restoring tests were working on `closed` harness, which was working only because test harness it self wasn't properly closing itself > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8119) Cannot submit jobs to YARN Session in FLIP-6 mode
[ https://issues.apache.org/jira/browse/FLINK-8119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308146#comment-16308146 ] ASF GitHub Bot commented on FLINK-8119: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5234 [FLINK-8119] [flip6] Wire correct Flip6 components in Flip6YarnClusterDescriptor ## What is the purpose of the change Let the Flip6YarnClusterDescriptor create a RestClusterClient as ClusterClient. Moreover, this commit makes the YarnResourceManager register under the REST port at Yarn. This PR is based on #5233. ## Brief change log - Implement unsupported methods in `RestClusterClient` - Register `YarnResourceManager` under REST port at Yarn - Let `Flip6YarnClusterDescriptor` return a `RestClusterClient` ## Verifying this change - Tested manually ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink implementRestClusterClientMethods Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5234.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5234 commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b Author: Till RohrmannDate: 2017-12-07T12:57:24Z [FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication with Yarn and, thus, gives a better separation of concerns. commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44 Author: Till Rohrmann Date: 2017-12-20T15:43:21Z [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor. This makes the latter responsible for the lifecycle management of the client and gives a better separation of concerns. commit 192adb786a48d19b71e797355500652d51de6296 Author: Till Rohrmann Date: 2017-12-18T17:59:30Z [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. commit a73b6e2850d4b2445a835914ba570d1057e59dfb Author: Till Rohrmann Date: 2018-01-02T06:42:18Z [FLINK-8333] [flip6] Separate deployment options from command options This commit separates the parsing of command options and deployment options into two steps. This makes it easier to make the CustomCommandLines non-static. Moreover, this commit moves the CliFrontend into the cli sub package. commit 57df8f9e1c5f4015cd6543ded43849a71003ef36 Author: Till Rohrmann Date: 2018-01-02T06:59:34Z [FLINK-8338] [flip6] Make CustomCommandLines non static in CliFrontend This commit changes how CustomCommandLines are registered at the CliFrontend. Henceforth, the CliFrontend is initialized with the set of CustomCommandLines instead of registering them statically. This improves maintainability and testability. commit 2ee8059b949b4a9c2e165f3e78716ae531e5c0e7 Author: Till Rohrmann Date: 2018-01-02T08:22:12Z [FLINK-8339] [flip6] Let CustomCommandLine return ClusterDescriptor Instead of directly retrieving or deploying a Flink cluster, the CustomCommandLine now only returns a ClusterDescriptor which can be used for these operations. This disentangles the ClusterDescriptor and the CustomCommandLine a bit better supporting a proper lifecycle management of the former. commit 9ff62c7cfadc977d02f414ee26fae6b76a568eda Author: Till Rohrmann Date: 2018-01-02T08:55:13Z [FLINK-8340] [flip6] Remove passing of Configuration to CustomCommandLine Since the
[GitHub] flink pull request #5234: [FLINK-8119] [flip6] Wire correct Flip6 components...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5234 [FLINK-8119] [flip6] Wire correct Flip6 components in Flip6YarnClusterDescriptor ## What is the purpose of the change Let the Flip6YarnClusterDescriptor create a RestClusterClient as ClusterClient. Moreover, this commit makes the YarnResourceManager register under the REST port at Yarn. This PR is based on #5233. ## Brief change log - Implement unsupported methods in `RestClusterClient` - Register `YarnResourceManager` under REST port at Yarn - Let `Flip6YarnClusterDescriptor` return a `RestClusterClient` ## Verifying this change - Tested manually ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink implementRestClusterClientMethods Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5234.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5234 commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b Author: Till RohrmannDate: 2017-12-07T12:57:24Z [FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication with Yarn and, thus, gives a better separation of concerns. commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44 Author: Till Rohrmann Date: 2017-12-20T15:43:21Z [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor. This makes the latter responsible for the lifecycle management of the client and gives a better separation of concerns. commit 192adb786a48d19b71e797355500652d51de6296 Author: Till Rohrmann Date: 2017-12-18T17:59:30Z [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. commit a73b6e2850d4b2445a835914ba570d1057e59dfb Author: Till Rohrmann Date: 2018-01-02T06:42:18Z [FLINK-8333] [flip6] Separate deployment options from command options This commit separates the parsing of command options and deployment options into two steps. This makes it easier to make the CustomCommandLines non-static. Moreover, this commit moves the CliFrontend into the cli sub package. commit 57df8f9e1c5f4015cd6543ded43849a71003ef36 Author: Till Rohrmann Date: 2018-01-02T06:59:34Z [FLINK-8338] [flip6] Make CustomCommandLines non static in CliFrontend This commit changes how CustomCommandLines are registered at the CliFrontend. Henceforth, the CliFrontend is initialized with the set of CustomCommandLines instead of registering them statically. This improves maintainability and testability. commit 2ee8059b949b4a9c2e165f3e78716ae531e5c0e7 Author: Till Rohrmann Date: 2018-01-02T08:22:12Z [FLINK-8339] [flip6] Let CustomCommandLine return ClusterDescriptor Instead of directly retrieving or deploying a Flink cluster, the CustomCommandLine now only returns a ClusterDescriptor which can be used for these operations. This disentangles the ClusterDescriptor and the CustomCommandLine a bit better supporting a proper lifecycle management of the former. commit 9ff62c7cfadc977d02f414ee26fae6b76a568eda Author: Till Rohrmann Date: 2018-01-02T08:55:13Z [FLINK-8340] [flip6] Remove passing of Configuration to CustomCommandLine Since the Configuration does not change over the lifetime of a CustomCommandLine, we can safely pass it as a constructor argument instead of method argument. commit 2b6fda525fb737a76b055510f8081003010237ed Author: Till Rohrmann
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308134#comment-16308134 ] ASF GitHub Bot commented on FLINK-8268: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159221102 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Utility class to simulate in memory file like writes, flushes and closing. + */ +public class ContentDump { + private boolean writable = true; + private MapfilesContent = new HashMap<>(); + + public Set listFiles() { + return filesContent.keySet(); + } + + public void setWritable(boolean writable) { + this.writable = writable; + } + + /** +* Creates an empty file. +*/ + public ContentWriter createWriter(String name) { + checkArgument(!filesContent.containsKey(name), "File [%s] already exists", name); + filesContent.put(name, new ArrayList<>()); + return new ContentWriter(name, this); + } + + public static void move(String name, ContentDump source, ContentDump target) { + Collection content = source.read(name); + try (ContentWriter contentWriter = target.createWriter(name)) { + contentWriter.write(content).flush(); + } + source.delete(name); + } + + public void delete(String name) { + filesContent.remove(name); + } + + public Collection read(String name) { + List content = filesContent.get(name); + checkState(content != null, "Unknown file [%s]", name); + List result = new ArrayList<>(content); + content.clear(); + return result; + } + + private void putContent(String name, List values) { + List content = filesContent.get(name); + checkState(content != null, "Unknown file [%s]", name); + if (!writable) { + throw new NotWritableException(name); + } + content.addAll(values); + } + + /** +* {@link ContentWriter} represents an abstraction that allows to putContent to the {@link ContentDump}. +*/ + public static class ContentWriter implements AutoCloseable { + private final ContentDump contentDump; + private final String name; + private final List buffer = new ArrayList<>(); + private boolean closed = false; + + private ContentWriter(String name, ContentDump contentDump) { + this.name = checkNotNull(name); + this.contentDump = checkNotNull(contentDump); + } + + public String getName() { + return name; + } + + public ContentWriter write(String value) { + if (closed) { + throw new IllegalStateException(); + } + buffer.add(value); + return this; + } + + public ContentWriter write(Collection values) { + values.forEach(this::write); + return this; +
[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159229650 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java --- @@ -492,6 +503,10 @@ public void close() throws Exception { processingTimeService.shutdownService(); } setupCalled = false; + + if (internalEnvironment.isPresent()) { --- End diff -- @tzulitai, @GJL is correct, `Environment` is not `Closeable`/`AutoCloseable`. Also I was afraid that someone might be reusing some external environment, but I think that's not the case. Using `environmentIsInternal` flag would require some casting, that's why I have chosen `Optional`. +1 for `internalEnvironment.ifPresent(MockEnvironment::close);` ---
[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159235730 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java --- @@ -42,22 +41,32 @@ public class SourceFunctionUtil { public static List runSourceFunction(SourceFunction sourceFunction) throws Exception { - final List outputs = new ArrayList(); - if (sourceFunction instanceof RichFunction) { + return runRichSourceFunction(sourceFunction); + } + else { + return runNonRichSourceFunction(sourceFunction); + } + } + private static List runRichSourceFunction(SourceFunction sourceFunction) throws Exception { + try (MockEnvironment environment = new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024)) { AbstractStreamOperator operator = mock(AbstractStreamOperator.class); when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig()); - RuntimeContext runtimeContext = new StreamingRuntimeContext( - operator, - new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024), - new HashMap()); - + RuntimeContext runtimeContext = new StreamingRuntimeContext( + operator, + environment, + new HashMap<>()); ((RichFunction) sourceFunction).setRuntimeContext(runtimeContext); - ((RichFunction) sourceFunction).open(new Configuration()); + + return runNonRichSourceFunction(sourceFunction); --- End diff -- It wouldn't suffice, because rich function's `MockEnvironment` must be closed. It would have to be something like: ``` Optional context = Optional.empty(); try { if (sourceFunction instanceof RichFunction) { context = setupRichSourceFunction(sourceFunction); } return runSourceFunction(sourceFunction); } finally { context.ifPresent(MockEnvironment::close); } ``` Which in my opinion is uglier :( ---
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308135#comment-16308135 ] ASF GitHub Bot commented on FLINK-8268: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159221837 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Utility class to simulate in memory file like writes, flushes and closing. + */ +public class ContentDump { + private boolean writable = true; + private MapfilesContent = new HashMap<>(); + + public Set listFiles() { + return filesContent.keySet(); + } + + public void setWritable(boolean writable) { + this.writable = writable; + } + + /** +* Creates an empty file. +*/ + public ContentWriter createWriter(String name) { + checkArgument(!filesContent.containsKey(name), "File [%s] already exists", name); + filesContent.put(name, new ArrayList<>()); + return new ContentWriter(name, this); + } + + public static void move(String name, ContentDump source, ContentDump target) { + Collection content = source.read(name); + try (ContentWriter contentWriter = target.createWriter(name)) { + contentWriter.write(content).flush(); + } + source.delete(name); + } + + public void delete(String name) { + filesContent.remove(name); + } + + public Collection read(String name) { + List content = filesContent.get(name); + checkState(content != null, "Unknown file [%s]", name); + List result = new ArrayList<>(content); + content.clear(); + return result; + } + + private void putContent(String name, List values) { + List content = filesContent.get(name); + checkState(content != null, "Unknown file [%s]", name); + if (!writable) { + throw new NotWritableException(name); + } + content.addAll(values); + } + + /** +* {@link ContentWriter} represents an abstraction that allows to putContent to the {@link ContentDump}. +*/ + public static class ContentWriter implements AutoCloseable { + private final ContentDump contentDump; + private final String name; + private final List buffer = new ArrayList<>(); + private boolean closed = false; + + private ContentWriter(String name, ContentDump contentDump) { + this.name = checkNotNull(name); + this.contentDump = checkNotNull(contentDump); + } + + public String getName() { + return name; + } + + public ContentWriter write(String value) { + if (closed) { + throw new IllegalStateException(); + } + buffer.add(value); + return this; + } + + public ContentWriter write(Collection values) { + values.forEach(this::write); + return this; +
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308132#comment-16308132 ] ASF GitHub Bot commented on FLINK-8268: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r15950 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java --- @@ -145,19 +145,7 @@ public MemoryManager getMemoryManager() { } @After - public void shutdownIOManager() throws Exception { - this.mockEnv.getIOManager().shutdown(); - Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown()); - } - - @After - public void shutdownMemoryManager() throws Exception { - if (this.memorySize > 0) { --- End diff -- No, it didn't matter so I allowed myself to simplify this shutdown. But good catch. I was thinking about doing this simplification in separate commit so that there would be no need for "catching" this behaviour change. > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308137#comment-16308137 ] ASF GitHub Bot commented on FLINK-8268: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159220879 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Utility class to simulate in memory file like writes, flushes and closing. + */ +public class ContentDump { + private boolean writable = true; + private MapfilesContent = new HashMap<>(); + + public Set listFiles() { + return filesContent.keySet(); + } + + public void setWritable(boolean writable) { + this.writable = writable; + } + + /** +* Creates an empty file. +*/ + public ContentWriter createWriter(String name) { + checkArgument(!filesContent.containsKey(name), "File [%s] already exists", name); + filesContent.put(name, new ArrayList<>()); + return new ContentWriter(name, this); + } + + public static void move(String name, ContentDump source, ContentDump target) { + Collection content = source.read(name); + try (ContentWriter contentWriter = target.createWriter(name)) { + contentWriter.write(content).flush(); + } + source.delete(name); + } + + public void delete(String name) { + filesContent.remove(name); + } + + public Collection read(String name) { + List content = filesContent.get(name); + checkState(content != null, "Unknown file [%s]", name); + List result = new ArrayList<>(content); + content.clear(); --- End diff -- Hmm, I was thinking more in a manner of reading from a file descriptor, where subsequent reads return only new data. But now when I think about it, that would be more expected behaviour for some `ContentReader` class and dropping `content.clear()` here makes more sense. > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) >
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308133#comment-16308133 ] ASF GitHub Bot commented on FLINK-8268: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159229650 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java --- @@ -492,6 +503,10 @@ public void close() throws Exception { processingTimeService.shutdownService(); } setupCalled = false; + + if (internalEnvironment.isPresent()) { --- End diff -- @tzulitai, @GJL is correct, `Environment` is not `Closeable`/`AutoCloseable`. Also I was afraid that someone might be reusing some external environment, but I think that's not the case. Using `environmentIsInternal` flag would require some casting, that's why I have chosen `Optional`. +1 for `internalEnvironment.ifPresent(MockEnvironment::close);` > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159220879 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Utility class to simulate in memory file like writes, flushes and closing. + */ +public class ContentDump { + private boolean writable = true; + private MapfilesContent = new HashMap<>(); + + public Set listFiles() { + return filesContent.keySet(); + } + + public void setWritable(boolean writable) { + this.writable = writable; + } + + /** +* Creates an empty file. +*/ + public ContentWriter createWriter(String name) { + checkArgument(!filesContent.containsKey(name), "File [%s] already exists", name); + filesContent.put(name, new ArrayList<>()); + return new ContentWriter(name, this); + } + + public static void move(String name, ContentDump source, ContentDump target) { + Collection content = source.read(name); + try (ContentWriter contentWriter = target.createWriter(name)) { + contentWriter.write(content).flush(); + } + source.delete(name); + } + + public void delete(String name) { + filesContent.remove(name); + } + + public Collection read(String name) { + List content = filesContent.get(name); + checkState(content != null, "Unknown file [%s]", name); + List result = new ArrayList<>(content); + content.clear(); --- End diff -- Hmm, I was thinking more in a manner of reading from a file descriptor, where subsequent reads return only new data. But now when I think about it, that would be more expected behaviour for some `ContentReader` class and dropping `content.clear()` here makes more sense. ---
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308136#comment-16308136 ] ASF GitHub Bot commented on FLINK-8268: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159235730 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java --- @@ -42,22 +41,32 @@ public class SourceFunctionUtil { public static List runSourceFunction(SourceFunction sourceFunction) throws Exception { - final List outputs = new ArrayList(); - if (sourceFunction instanceof RichFunction) { + return runRichSourceFunction(sourceFunction); + } + else { + return runNonRichSourceFunction(sourceFunction); + } + } + private static List runRichSourceFunction(SourceFunction sourceFunction) throws Exception { + try (MockEnvironment environment = new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024)) { AbstractStreamOperator operator = mock(AbstractStreamOperator.class); when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig()); - RuntimeContext runtimeContext = new StreamingRuntimeContext( - operator, - new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024), - new HashMap()); - + RuntimeContext runtimeContext = new StreamingRuntimeContext( + operator, + environment, + new HashMap<>()); ((RichFunction) sourceFunction).setRuntimeContext(runtimeContext); - ((RichFunction) sourceFunction).open(new Configuration()); + + return runNonRichSourceFunction(sourceFunction); --- End diff -- It wouldn't suffice, because rich function's `MockEnvironment` must be closed. It would have to be something like: ``` Optional context = Optional.empty(); try { if (sourceFunction instanceof RichFunction) { context = setupRichSourceFunction(sourceFunction); } return runSourceFunction(sourceFunction); } finally { context.ifPresent(MockEnvironment::close); } ``` Which in my opinion is uglier :( > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at >
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308131#comment-16308131 ] ASF GitHub Bot commented on FLINK-8268: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159224671 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java --- @@ -179,9 +179,6 @@ public void testFailBeforeNotify() throws Exception { harness.initializeState(snapshot); assertExactlyOnce(Arrays.asList("42", "43")); - closeTestHarness(); - - assertEquals(0, tmpDirectory.listFiles().size()); --- End diff -- restored. > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r15950 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java --- @@ -145,19 +145,7 @@ public MemoryManager getMemoryManager() { } @After - public void shutdownIOManager() throws Exception { - this.mockEnv.getIOManager().shutdown(); - Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown()); - } - - @After - public void shutdownMemoryManager() throws Exception { - if (this.memorySize > 0) { --- End diff -- No, it didn't matter so I allowed myself to simplify this shutdown. But good catch. I was thinking about doing this simplification in separate commit so that there would be no need for "catching" this behaviour change. ---
[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159221102 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Utility class to simulate in memory file like writes, flushes and closing. + */ +public class ContentDump { + private boolean writable = true; + private MapfilesContent = new HashMap<>(); + + public Set listFiles() { + return filesContent.keySet(); + } + + public void setWritable(boolean writable) { + this.writable = writable; + } + + /** +* Creates an empty file. +*/ + public ContentWriter createWriter(String name) { + checkArgument(!filesContent.containsKey(name), "File [%s] already exists", name); + filesContent.put(name, new ArrayList<>()); + return new ContentWriter(name, this); + } + + public static void move(String name, ContentDump source, ContentDump target) { + Collection content = source.read(name); + try (ContentWriter contentWriter = target.createWriter(name)) { + contentWriter.write(content).flush(); + } + source.delete(name); + } + + public void delete(String name) { + filesContent.remove(name); + } + + public Collection read(String name) { + List content = filesContent.get(name); + checkState(content != null, "Unknown file [%s]", name); + List result = new ArrayList<>(content); + content.clear(); + return result; + } + + private void putContent(String name, List values) { + List content = filesContent.get(name); + checkState(content != null, "Unknown file [%s]", name); + if (!writable) { + throw new NotWritableException(name); + } + content.addAll(values); + } + + /** +* {@link ContentWriter} represents an abstraction that allows to putContent to the {@link ContentDump}. +*/ + public static class ContentWriter implements AutoCloseable { + private final ContentDump contentDump; + private final String name; + private final List buffer = new ArrayList<>(); + private boolean closed = false; + + private ContentWriter(String name, ContentDump contentDump) { + this.name = checkNotNull(name); + this.contentDump = checkNotNull(contentDump); + } + + public String getName() { + return name; + } + + public ContentWriter write(String value) { + if (closed) { + throw new IllegalStateException(); + } + buffer.add(value); + return this; + } + + public ContentWriter write(Collection values) { + values.forEach(this::write); + return this; + } + + public ContentWriter flush() { + contentDump.putContent(name, buffer); + return this; + } + + public void close() { --- End diff --
[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159221837 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Utility class to simulate in memory file like writes, flushes and closing. + */ +public class ContentDump { + private boolean writable = true; + private MapfilesContent = new HashMap<>(); + + public Set listFiles() { + return filesContent.keySet(); + } + + public void setWritable(boolean writable) { + this.writable = writable; + } + + /** +* Creates an empty file. +*/ + public ContentWriter createWriter(String name) { + checkArgument(!filesContent.containsKey(name), "File [%s] already exists", name); + filesContent.put(name, new ArrayList<>()); + return new ContentWriter(name, this); + } + + public static void move(String name, ContentDump source, ContentDump target) { + Collection content = source.read(name); + try (ContentWriter contentWriter = target.createWriter(name)) { + contentWriter.write(content).flush(); + } + source.delete(name); + } + + public void delete(String name) { + filesContent.remove(name); + } + + public Collection read(String name) { + List content = filesContent.get(name); + checkState(content != null, "Unknown file [%s]", name); + List result = new ArrayList<>(content); + content.clear(); + return result; + } + + private void putContent(String name, List values) { + List content = filesContent.get(name); + checkState(content != null, "Unknown file [%s]", name); + if (!writable) { + throw new NotWritableException(name); + } + content.addAll(values); + } + + /** +* {@link ContentWriter} represents an abstraction that allows to putContent to the {@link ContentDump}. +*/ + public static class ContentWriter implements AutoCloseable { + private final ContentDump contentDump; + private final String name; + private final List buffer = new ArrayList<>(); + private boolean closed = false; + + private ContentWriter(String name, ContentDump contentDump) { + this.name = checkNotNull(name); + this.contentDump = checkNotNull(contentDump); + } + + public String getName() { + return name; + } + + public ContentWriter write(String value) { + if (closed) { + throw new IllegalStateException(); + } + buffer.add(value); + return this; + } + + public ContentWriter write(Collection values) { + values.forEach(this::write); + return this; + } + + public ContentWriter flush() { + contentDump.putContent(name, buffer); + return this; + } + + public void close() { +
[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159224671 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java --- @@ -179,9 +179,6 @@ public void testFailBeforeNotify() throws Exception { harness.initializeState(snapshot); assertExactlyOnce(Arrays.asList("42", "43")); - closeTestHarness(); - - assertEquals(0, tmpDirectory.listFiles().size()); --- End diff -- restored. ---
[jira] [Updated] (FLINK-8119) Cannot submit jobs to YARN Session in FLIP-6 mode
[ https://issues.apache.org/jira/browse/FLINK-8119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-8119: - Issue Type: Sub-task (was: Bug) Parent: FLINK-4352 > Cannot submit jobs to YARN Session in FLIP-6 mode > - > > Key: FLINK-8119 > URL: https://issues.apache.org/jira/browse/FLINK-8119 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Cannot submit jobs to YARN Session in FLIP-6 mode because > {{FlinkYarnSessionCli}} becomes the _active_ CLI (should be > {{Flip6DefaultCLI}}). > *Steps to reproduce* > # Build Flink 1.5 {{101fef7397128b0aba23221481ab86f62b18118f}} > # {{bin/yarn-session.sh -flip6 -d -n 1 -s 1 -jm 1024 -tm 1024}} > # {{bin/flink run -flip6 ./examples/streaming/WordCount.jar}} > # Verify that the job will not run. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308116#comment-16308116 ] ASF GitHub Bot commented on FLINK-8328: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159227955 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -660,7 +570,25 @@ public int run( "yarn application -kill " + applicationId.getOpt()); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, true); + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); --- End diff -- I think the executor could as well be in the Monitor. If needed in the future, one could provide a constructor that accepts an external executor (e.g., for unit tests). > Pull Yarn ApplicationStatus polling out of YarnClusterClient > > > Key: FLINK-8328 > URL: https://issues.apache.org/jira/browse/FLINK-8328 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to > pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. > I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has > also the benefit of separating concerns better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308122#comment-16308122 ] ASF GitHub Bot commented on FLINK-8328: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159228367 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -717,7 +645,26 @@ public int run( yarnCluster.waitForClusterToBeReady(); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, acceptInteractiveInput); + + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( + yarnDescriptor.getYarnClient(), + yarnCluster.getApplicationId(), + new ScheduledExecutorServiceAdapter(scheduledExecutorService))){ + runInteractiveCli( + yarnCluster, + yarnApplicationStatusMonitor, + acceptInteractiveInput); --- End diff -- The code block looks duplicated except for this flag. > Pull Yarn ApplicationStatus polling out of YarnClusterClient > > > Key: FLINK-8328 > URL: https://issues.apache.org/jira/browse/FLINK-8328 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to > pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. > I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has > also the benefit of separating concerns better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159227619 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -717,7 +645,26 @@ public int run( yarnCluster.waitForClusterToBeReady(); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, acceptInteractiveInput); + + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( + yarnDescriptor.getYarnClient(), + yarnCluster.getApplicationId(), + new ScheduledExecutorServiceAdapter(scheduledExecutorService))){ + runInteractiveCli( + yarnCluster, + yarnApplicationStatusMonitor, + acceptInteractiveInput); + } catch (Exception e) { + LOG.info("Could not properly close the Yarn application status monitor.", e); --- End diff -- Same here. Catch block could be avoided. ---
[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308120#comment-16308120 ] ASF GitHub Bot commented on FLINK-8328: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159224695 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -743,6 +690,142 @@ private void logAndSysout(String message) { System.out.println(message); } + public static void main(final String[] args) throws Exception { + final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session + + final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); + + final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(); + SecurityUtils.install(new SecurityConfiguration(flinkConfiguration)); + int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() { + return cli.run(args, flinkConfiguration, configurationDirectory); + } + }); + System.exit(retCode); + } + + private static void runInteractiveCli( + YarnClusterClient clusterClient, + YarnApplicationStatusMonitor yarnApplicationStatusMonitor, + boolean readConsoleInput) { + try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) { + boolean continueRepl = true; + int numTaskmanagers = 0; + long unknownStatusSince = System.currentTimeMillis(); + + while (continueRepl) { + + final ApplicationStatus applicationStatus = yarnApplicationStatusMonitor.getApplicationStatusNow(); + + switch (applicationStatus) { + case FAILED: + case CANCELED: + System.err.println("The Flink Yarn cluster has failed."); + continueRepl = false; + break; + case UNKNOWN: + if (unknownStatusSince < 0L) { + unknownStatusSince = System.currentTimeMillis(); + } + + if ((System.currentTimeMillis() - unknownStatusSince) > CLIENT_POLLING_INTERVAL_MS) { + System.err.println("The Flink Yarn cluster is in an unknown state. Please check the Yarn cluster."); + continueRepl = false; + } else { + continueRepl = repStep(in, readConsoleInput); + } + break; + case SUCCEEDED: + if (unknownStatusSince > 0L) { + unknownStatusSince = -1L; + } + + // -- check if there are updates by the cluster --- + try { + final GetClusterStatusResponse status = clusterClient.getClusterStatus(); + + if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) { + System.err.println("Number of connected TaskManagers changed to " + + status.numRegisteredTaskManagers() + ". " + + "Slots available: " + status.totalNumberOfSlots()); + numTaskmanagers = status.numRegisteredTaskManagers(); + } + } catch (Exception e) { + LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", e); +
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159226314 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -660,7 +570,25 @@ public int run( "yarn application -kill " + applicationId.getOpt()); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, true); + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( + yarnDescriptor.getYarnClient(), + yarnCluster.getApplicationId(), + new ScheduledExecutorServiceAdapter(scheduledExecutorService))) { + runInteractiveCli( + yarnCluster, + yarnApplicationStatusMonitor, + true); + } catch (Exception e) { --- End diff -- Closing `YarnApplicationStatusMonitor` should not throw any checked exceptions. If you change the signature, this catch block won't be needed. ---
[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308118#comment-16308118 ] ASF GitHub Bot commented on FLINK-8328: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159227619 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -717,7 +645,26 @@ public int run( yarnCluster.waitForClusterToBeReady(); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, acceptInteractiveInput); + + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( + yarnDescriptor.getYarnClient(), + yarnCluster.getApplicationId(), + new ScheduledExecutorServiceAdapter(scheduledExecutorService))){ + runInteractiveCli( + yarnCluster, + yarnApplicationStatusMonitor, + acceptInteractiveInput); + } catch (Exception e) { + LOG.info("Could not properly close the Yarn application status monitor.", e); --- End diff -- Same here. Catch block could be avoided. > Pull Yarn ApplicationStatus polling out of YarnClusterClient > > > Key: FLINK-8328 > URL: https://issues.apache.org/jira/browse/FLINK-8328 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to > pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. > I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has > also the benefit of separating concerns better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308119#comment-16308119 ] ASF GitHub Bot commented on FLINK-8328: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159225871 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.cli; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Utility class which monitors the specified yarn application status periodically. + */ +public class YarnApplicationStatusMonitor implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationStatusMonitor.class); + + private static final long UPDATE_INTERVAL = 1000L; + + private final YarnClient yarnClient; + + private final ApplicationId yarnApplicationId; + + private final ScheduledFuture applicationStatusUpdateFuture; + + private volatile ApplicationStatus applicationStatus; + + public YarnApplicationStatusMonitor( + YarnClient yarnClient, + ApplicationId yarnApplicationId, + ScheduledExecutor scheduledExecutor) { + this.yarnClient = Preconditions.checkNotNull(yarnClient); + this.yarnApplicationId = Preconditions.checkNotNull(yarnApplicationId); + + applicationStatusUpdateFuture = scheduledExecutor.scheduleWithFixedDelay( + this::updateApplicationStatus, + UPDATE_INTERVAL, + UPDATE_INTERVAL, + TimeUnit.MILLISECONDS); + + applicationStatus = ApplicationStatus.UNKNOWN; + } + + public ApplicationStatus getApplicationStatusNow() { + return applicationStatus; + } + + @Override + public void close() throws Exception { + applicationStatusUpdateFuture.cancel(false); --- End diff -- There is no need to declare `throws Exception` here because `cancel()` does not throw any checked exceptions. > Pull Yarn ApplicationStatus polling out of YarnClusterClient > > > Key: FLINK-8328 > URL: https://issues.apache.org/jira/browse/FLINK-8328 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to > pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. > I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has > also the benefit of separating concerns better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308121#comment-16308121 ] ASF GitHub Bot commented on FLINK-8328: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159226314 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -660,7 +570,25 @@ public int run( "yarn application -kill " + applicationId.getOpt()); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, true); + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( + yarnDescriptor.getYarnClient(), + yarnCluster.getApplicationId(), + new ScheduledExecutorServiceAdapter(scheduledExecutorService))) { + runInteractiveCli( + yarnCluster, + yarnApplicationStatusMonitor, + true); + } catch (Exception e) { --- End diff -- Closing `YarnApplicationStatusMonitor` should not throw any checked exceptions. If you change the signature, this catch block won't be needed. > Pull Yarn ApplicationStatus polling out of YarnClusterClient > > > Key: FLINK-8328 > URL: https://issues.apache.org/jira/browse/FLINK-8328 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to > pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. > I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has > also the benefit of separating concerns better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159224695 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -743,6 +690,142 @@ private void logAndSysout(String message) { System.out.println(message); } + public static void main(final String[] args) throws Exception { + final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session + + final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); + + final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(); + SecurityUtils.install(new SecurityConfiguration(flinkConfiguration)); + int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() { + return cli.run(args, flinkConfiguration, configurationDirectory); + } + }); + System.exit(retCode); + } + + private static void runInteractiveCli( + YarnClusterClient clusterClient, + YarnApplicationStatusMonitor yarnApplicationStatusMonitor, + boolean readConsoleInput) { + try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) { + boolean continueRepl = true; + int numTaskmanagers = 0; + long unknownStatusSince = System.currentTimeMillis(); + + while (continueRepl) { + + final ApplicationStatus applicationStatus = yarnApplicationStatusMonitor.getApplicationStatusNow(); + + switch (applicationStatus) { + case FAILED: + case CANCELED: + System.err.println("The Flink Yarn cluster has failed."); + continueRepl = false; + break; + case UNKNOWN: + if (unknownStatusSince < 0L) { + unknownStatusSince = System.currentTimeMillis(); + } + + if ((System.currentTimeMillis() - unknownStatusSince) > CLIENT_POLLING_INTERVAL_MS) { + System.err.println("The Flink Yarn cluster is in an unknown state. Please check the Yarn cluster."); + continueRepl = false; + } else { + continueRepl = repStep(in, readConsoleInput); + } + break; + case SUCCEEDED: + if (unknownStatusSince > 0L) { + unknownStatusSince = -1L; + } + + // -- check if there are updates by the cluster --- + try { + final GetClusterStatusResponse status = clusterClient.getClusterStatus(); + + if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) { + System.err.println("Number of connected TaskManagers changed to " + + status.numRegisteredTaskManagers() + ". " + + "Slots available: " + status.totalNumberOfSlots()); + numTaskmanagers = status.numRegisteredTaskManagers(); + } + } catch (Exception e) { + LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", e); + } + + printClusterMessages(clusterClient); + + continueRepl = repStep(in, readConsoleInput);
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159225871 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.cli; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Utility class which monitors the specified yarn application status periodically. + */ +public class YarnApplicationStatusMonitor implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationStatusMonitor.class); + + private static final long UPDATE_INTERVAL = 1000L; + + private final YarnClient yarnClient; + + private final ApplicationId yarnApplicationId; + + private final ScheduledFuture applicationStatusUpdateFuture; + + private volatile ApplicationStatus applicationStatus; + + public YarnApplicationStatusMonitor( + YarnClient yarnClient, + ApplicationId yarnApplicationId, + ScheduledExecutor scheduledExecutor) { + this.yarnClient = Preconditions.checkNotNull(yarnClient); + this.yarnApplicationId = Preconditions.checkNotNull(yarnApplicationId); + + applicationStatusUpdateFuture = scheduledExecutor.scheduleWithFixedDelay( + this::updateApplicationStatus, + UPDATE_INTERVAL, + UPDATE_INTERVAL, + TimeUnit.MILLISECONDS); + + applicationStatus = ApplicationStatus.UNKNOWN; + } + + public ApplicationStatus getApplicationStatusNow() { + return applicationStatus; + } + + @Override + public void close() throws Exception { + applicationStatusUpdateFuture.cancel(false); --- End diff -- There is no need to declare `throws Exception` here because `cancel()` does not throw any checked exceptions. ---
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159227955 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -660,7 +570,25 @@ public int run( "yarn application -kill " + applicationId.getOpt()); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, true); + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); --- End diff -- I think the executor could as well be in the Monitor. If needed in the future, one could provide a constructor that accepts an external executor (e.g., for unit tests). ---
[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308115#comment-16308115 ] ASF GitHub Bot commented on FLINK-8328: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159227421 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -660,7 +570,25 @@ public int run( "yarn application -kill " + applicationId.getOpt()); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, true); + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( + yarnDescriptor.getYarnClient(), + yarnCluster.getApplicationId(), + new ScheduledExecutorServiceAdapter(scheduledExecutorService))) { --- End diff -- Why do we need to use the `ScheduledExecutor` interface from Flink? Why not use Java's `ScheduledExecutorService` directly? > Pull Yarn ApplicationStatus polling out of YarnClusterClient > > > Key: FLINK-8328 > URL: https://issues.apache.org/jira/browse/FLINK-8328 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to > pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. > I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has > also the benefit of separating concerns better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159228367 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -717,7 +645,26 @@ public int run( yarnCluster.waitForClusterToBeReady(); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, acceptInteractiveInput); + + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( + yarnDescriptor.getYarnClient(), + yarnCluster.getApplicationId(), + new ScheduledExecutorServiceAdapter(scheduledExecutorService))){ + runInteractiveCli( + yarnCluster, + yarnApplicationStatusMonitor, + acceptInteractiveInput); --- End diff -- The code block looks duplicated except for this flag. ---
[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308117#comment-16308117 ] ASF GitHub Bot commented on FLINK-8328: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159230885 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -743,6 +690,142 @@ private void logAndSysout(String message) { System.out.println(message); } + public static void main(final String[] args) throws Exception { + final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session + + final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); + + final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(); + SecurityUtils.install(new SecurityConfiguration(flinkConfiguration)); + int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() { + return cli.run(args, flinkConfiguration, configurationDirectory); + } + }); + System.exit(retCode); + } + + private static void runInteractiveCli( + YarnClusterClient clusterClient, + YarnApplicationStatusMonitor yarnApplicationStatusMonitor, + boolean readConsoleInput) { + try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) { + boolean continueRepl = true; + int numTaskmanagers = 0; + long unknownStatusSince = System.currentTimeMillis(); --- End diff -- nit: `System.nanoTime()` should be preferred to measure elapsed time because it does not depend on wall clock, i.e., it is not affected by the user changing the system's time: https://stackoverflow.com/a/351571 However, if you use `nanoTime()`, the trick in line `729` with negative `unknownStatusSince` won't work. > Pull Yarn ApplicationStatus polling out of YarnClusterClient > > > Key: FLINK-8328 > URL: https://issues.apache.org/jira/browse/FLINK-8328 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to > pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. > I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has > also the benefit of separating concerns better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159230885 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -743,6 +690,142 @@ private void logAndSysout(String message) { System.out.println(message); } + public static void main(final String[] args) throws Exception { + final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session + + final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); + + final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(); + SecurityUtils.install(new SecurityConfiguration(flinkConfiguration)); + int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() { + return cli.run(args, flinkConfiguration, configurationDirectory); + } + }); + System.exit(retCode); + } + + private static void runInteractiveCli( + YarnClusterClient clusterClient, + YarnApplicationStatusMonitor yarnApplicationStatusMonitor, + boolean readConsoleInput) { + try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) { + boolean continueRepl = true; + int numTaskmanagers = 0; + long unknownStatusSince = System.currentTimeMillis(); --- End diff -- nit: `System.nanoTime()` should be preferred to measure elapsed time because it does not depend on wall clock, i.e., it is not affected by the user changing the system's time: https://stackoverflow.com/a/351571 However, if you use `nanoTime()`, the trick in line `729` with negative `unknownStatusSince` won't work. ---
[jira] [Commented] (FLINK-8348) Print help for DefaultCLI
[ https://issues.apache.org/jira/browse/FLINK-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308113#comment-16308113 ] ASF GitHub Bot commented on FLINK-8348: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5233 [FLINK-8348] [flip6] Print help for DefaultCLI ## What is the purpose of the change Print help for the `DefaultCLI`. This PR is based on #5232. ## Verifying this change - Trivial change ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixCustomCommandLineHelp Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5233.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5233 commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b Author: Till RohrmannDate: 2017-12-07T12:57:24Z [FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication with Yarn and, thus, gives a better separation of concerns. commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44 Author: Till Rohrmann Date: 2017-12-20T15:43:21Z [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor. This makes the latter responsible for the lifecycle management of the client and gives a better separation of concerns. commit 192adb786a48d19b71e797355500652d51de6296 Author: Till Rohrmann Date: 2017-12-18T17:59:30Z [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. commit a73b6e2850d4b2445a835914ba570d1057e59dfb Author: Till Rohrmann Date: 2018-01-02T06:42:18Z [FLINK-8333] [flip6] Separate deployment options from command options This commit separates the parsing of command options and deployment options into two steps. This makes it easier to make the CustomCommandLines non-static. Moreover, this commit moves the CliFrontend into the cli sub package. commit 57df8f9e1c5f4015cd6543ded43849a71003ef36 Author: Till Rohrmann Date: 2018-01-02T06:59:34Z [FLINK-8338] [flip6] Make CustomCommandLines non static in CliFrontend This commit changes how CustomCommandLines are registered at the CliFrontend. Henceforth, the CliFrontend is initialized with the set of CustomCommandLines instead of registering them statically. This improves maintainability and testability. commit 2ee8059b949b4a9c2e165f3e78716ae531e5c0e7 Author: Till Rohrmann Date: 2018-01-02T08:22:12Z [FLINK-8339] [flip6] Let CustomCommandLine return ClusterDescriptor Instead of directly retrieving or deploying a Flink cluster, the CustomCommandLine now only returns a ClusterDescriptor which can be used for these operations. This disentangles the ClusterDescriptor and the CustomCommandLine a bit better supporting a proper lifecycle management of the former. commit 9ff62c7cfadc977d02f414ee26fae6b76a568eda Author: Till Rohrmann Date: 2018-01-02T08:55:13Z [FLINK-8340] [flip6] Remove passing of Configuration to CustomCommandLine Since the Configuration does not change over the lifetime of a CustomCommandLine, we can safely pass it as a constructor argument instead of method argument. commit 2b6fda525fb737a76b055510f8081003010237ed Author: Till Rohrmann Date: 2017-12-20T15:32:18Z [FLINK-8341] [flip6] Remove not needed options from CommandLineOptions commit 17755e5273364ccfc740d6b4a31b148d582d8cfa Author: Till Rohrmann