[jira] [Commented] (FLINK-8413) Snapshot state of aggregated data is not maintained in flink's checkpointing
[ https://issues.apache.org/jira/browse/FLINK-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323645#comment-16323645 ] Fabian Hueske commented on FLINK-8413: -- This might also an issue with Beam's Flink runner. In this case, the issue should be reported to Beam's JIRA. [~aljoscha] might be able to help here. > Snapshot state of aggregated data is not maintained in flink's checkpointing > > > Key: FLINK-8413 > URL: https://issues.apache.org/jira/browse/FLINK-8413 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: suganya > > We have a project which consumes events from kafka,does a groupby in a time > window(5 mins),after window elapses it pushes the events to downstream for > merge.This project is deployed using flink ,we have enabled checkpointing to > recover from failed state. > (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem) > Offsets from kafka get checkpointed every 5 > mins(checkpointingInterval).Before finishing the entire DAG(groupBy and > merge) , events offsets are getting checkpointed.So incase of any restart > from task-manager ,new task gets started from last successful checkpoint ,but > we could'nt able to get the aggregated snapshot data(data from groupBy task) > from the persisted checkpoint. > Able to retrieve the last successful checkpointed offset from kafka ,but > couldnt able to get last aggregated data till checkpointing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration (State TTL)
[ https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323611#comment-16323611 ] Bowen Li commented on FLINK-3089: - [~xfournet] yes, supporting only TTL in processing time would be another important assumption I'd like to make! Implementing TTL in HeapStateBackend can be a bit tricky because of the number of timers as you mentioned. W.r.t. development plan, I'm think we can probably add the interface and support TTL in RocksDBStateBackend first. Then we can decide 1) whether making it a RocksDBStateBackend-only feature, like incremental checkpointing 2) if we should support TTL in HeapStateBackend, how to implement it What do you think? cc [~srichter] [~aljoscha] > State API Should Support Data Expiration (State TTL) > > > Key: FLINK-3089 > URL: https://issues.apache.org/jira/browse/FLINK-3089 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Niels Basjes >Assignee: Bowen Li > > In some usecases (webanalytics) there is a need to have a state per visitor > on a website (i.e. keyBy(sessionid) ). > At some point the visitor simply leaves and no longer creates new events (so > a special 'end of session' event will not occur). > The only way to determine that a visitor has left is by choosing a timeout, > like "After 30 minutes no events we consider the visitor 'gone'". > Only after this (chosen) timeout has expired should we discard this state. > In the Trigger part of Windows we can set a timer and close/discard this kind > of information. But that introduces the buffering effect of the window (which > in some scenarios is unwanted). > What I would like is to be able to set a timeout on a specific state which I > can update afterwards. > This makes it possible to create a map function that assigns the right value > and that discards the state automatically. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-8413) Snapshot state of aggregated data is not maintained in flink's checkpointing
[ https://issues.apache.org/jira/browse/FLINK-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323546#comment-16323546 ] suganya edited comment on FLINK-8413 at 1/12/18 5:09 AM: - The following code is using apache beam libraries to create a pipeline.Please find the code. public void run(String[] args) { BeamCLIOptions beamCliOptions = PipelineOptionsFactory.fromArgs(args).withValidation() .as(BeamCLIOptions.class); Pipeline pipeline = Pipeline.create(beamCliOptions); MergeDistribution mergeDistribution = MergeDistribution .valueOf(beamCliOptions.getMergeDistribution()); MergeDistribution fixedWindowDuration = MergeDistribution .valueOf(beamCliOptions.getFixedWindowSize()); KafkaIO.ReadkafkaEntityStreamReader = KafkaIO. read() .withBootstrapServers(beamCliOptions.getKafkaServers()) .withTopic(beamCliOptions.getKafkaTopic()) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "latest","enable.auto.commit","true")); pipeline.apply(kafkaEntityStreamReader.withoutMetadata()) .apply(Values.create()) .apply(Window.into( FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins( .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins() .discardingFiredPanes() .withAllowedLateness(Duration.ZERO)) .apply(ParDo.of(new ExtractDataFn( beamCliOptions.getDatePartitionKey(), new DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis( .apply("Applying GroupByKey on -MM-DD HH ", GroupByKey.create()) .apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions))); pipeline.run(); } was (Author: suganyap): The following code is using apache beam libraries to create a pipeline.Please find the code. public void run(String[] args) { BeamCLIOptions beamCliOptions = PipelineOptionsFactory.fromArgs(args).withValidation() .as(BeamCLIOptions.class); Pipeline pipeline = Pipeline.create(beamCliOptions); MergeDistribution mergeDistribution = MergeDistribution .valueOf(beamCliOptions.getMergeDistribution()); MergeDistribution fixedWindowDuration = MergeDistribution .valueOf(beamCliOptions.getFixedWindowSize()); KafkaIO.Read kafkaEntityStreamReader = KafkaIO. read() .withBootstrapServers(beamCliOptions.getKafkaServers()) .withTopic(beamCliOptions.getKafkaTopic()) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "latest","enable.auto.commit","true")); pipeline.apply(kafkaEntityStreamReader.withoutMetadata()) .apply(Values.create()) .apply(Window.into( FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins( .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins() .discardingFiredPanes() .withAllowedLateness(Duration.ZERO)) .apply(ParDo.of(new ExtractDataFn( beamCliOptions.getDatePartitionKey(), new DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis( .apply("Applying GroupByKey on -MM-DD HH ", GroupByKey.create()) .apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions))); pipeline.run(); } > Snapshot state of aggregated data is not maintained in flink's checkpointing > > > Key: FLINK-8413 > URL: https://issues.apache.org/jira/browse/FLINK-8413 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: suganya > > We have a project which consumes events from kafka,does a groupby in a time > window(5 mins),after window elapses it pushes the events to downstream for > merge.This project is deployed using flink ,we have enabled checkpointing to > recover from failed state. > (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem) > Offsets from kafka get checkpointed every 5 > mins(checkpointingInterval).Before finishing the entire DAG(groupBy and > merge) , events offsets are getting checkpointed.So incase of any
[jira] [Commented] (FLINK-8413) Snapshot state of aggregated data is not maintained in flink's checkpointing
[ https://issues.apache.org/jira/browse/FLINK-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323546#comment-16323546 ] suganya commented on FLINK-8413: The following code is using apache beam libraries to create a pipeline.Please find the code. public void run(String[] args) { BeamCLIOptions beamCliOptions = PipelineOptionsFactory.fromArgs(args).withValidation() .as(BeamCLIOptions.class); Pipeline pipeline = Pipeline.create(beamCliOptions); MergeDistribution mergeDistribution = MergeDistribution .valueOf(beamCliOptions.getMergeDistribution()); MergeDistribution fixedWindowDuration = MergeDistribution .valueOf(beamCliOptions.getFixedWindowSize()); KafkaIO.ReadkafkaEntityStreamReader = KafkaIO. read() .withBootstrapServers(beamCliOptions.getKafkaServers()) .withTopic(beamCliOptions.getKafkaTopic()) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "latest","enable.auto.commit","true")); pipeline.apply(kafkaEntityStreamReader.withoutMetadata()) .apply(Values.create()) .apply(Window.into( FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins( .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins() .discardingFiredPanes() .withAllowedLateness(Duration.ZERO)) .apply(ParDo.of(new ExtractDataFn( beamCliOptions.getDatePartitionKey(), new DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis( .apply("Applying GroupByKey on -MM-DD HH ", GroupByKey.create()) .apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions))); pipeline.run(); } > Snapshot state of aggregated data is not maintained in flink's checkpointing > > > Key: FLINK-8413 > URL: https://issues.apache.org/jira/browse/FLINK-8413 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: suganya > > We have a project which consumes events from kafka,does a groupby in a time > window(5 mins),after window elapses it pushes the events to downstream for > merge.This project is deployed using flink ,we have enabled checkpointing to > recover from failed state. > (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem) > Offsets from kafka get checkpointed every 5 > mins(checkpointingInterval).Before finishing the entire DAG(groupBy and > merge) , events offsets are getting checkpointed.So incase of any restart > from task-manager ,new task gets started from last successful checkpoint ,but > we could'nt able to get the aggregated snapshot data(data from groupBy task) > from the persisted checkpoint. > Able to retrieve the last successful checkpointed offset from kafka ,but > couldnt able to get last aggregated data till checkpointing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5280: [hotfix] Fix typo in AbstractMetricGroup.java
Github user maqingxiang commented on the issue: https://github.com/apache/flink/pull/5280 Done, sorry. ---
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323447#comment-16323447 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5182 Thanks @casidiablo. Merging ... > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' millisBehi...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5182 Thanks @casidiablo. Merging ... ---
[jira] [Commented] (FLINK-3296) DataStream.write*() methods are not flushing properly
[ https://issues.apache.org/jira/browse/FLINK-3296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323442#comment-16323442 ] ASF GitHub Bot commented on FLINK-3296: --- Github user coveralls commented on the issue: https://github.com/apache/flink/pull/1563 [![Coverage Status](https://coveralls.io/builds/15015307/badge)](https://coveralls.io/builds/15015307) Changes Unknown when pulling **df49d5bb8ba778cdd17b94318c4bf48c6d1747ad on rmetzger:flink3296** into ** on apache:master**. > DataStream.write*() methods are not flushing properly > - > > Key: FLINK-3296 > URL: https://issues.apache.org/jira/browse/FLINK-3296 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Critical > Fix For: 1.0.0 > > > The DataStream.write() methods rely on the {{FileSinkFunctionByMillis}} > class, which has a logic for flushing records, even though the underlying > stream is never flushed. This is misleading for users as files are not > written as they would expect it. > The code was initial written with FileOutputFormats in mind, but the types > were not set correctly. This PR opened the write() method to any output > format: https://github.com/apache/flink/pull/706/files -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #1563: [FLINK-3296] Remove 'flushing' behavior of the OutputForm...
Github user coveralls commented on the issue: https://github.com/apache/flink/pull/1563 [![Coverage Status](https://coveralls.io/builds/15015307/badge)](https://coveralls.io/builds/15015307) Changes Unknown when pulling **df49d5bb8ba778cdd17b94318c4bf48c6d1747ad on rmetzger:flink3296** into ** on apache:master**. ---
[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods
[ https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323429#comment-16323429 ] ASF GitHub Bot commented on FLINK-8306: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5284 [FLINK-8306] [kafka, tests] Fix invalid mock verifications on final method ## What is the purpose of the change This is a reworked version of #5200. Instead of introducing a new interface while we are still unsure of how the refactoring of the Kafka consumer should head towards, this version is a simple fix to have proper mocks in the `FlinkKafkaConsumerBase`. Only the last two commits are relevant (PR is based on #5188). ## Brief change log - ca1aa00 Remove invalid mock verifications, and introduce a proper `MockFetcher` mock. - d08727c is a hotfix that corrects a stale comment on a no-longer existing behaviour ## Verifying this change This change is a code cleanup, and is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8306-reworked Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5284.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5284 commit 735972332ff25623898b25f210b7277aafbc7351 Author: Tzu-Li (Gordon) TaiDate: 2017-12-20T00:10:44Z [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java reflection Reflection was mainly used to inject mocks into private fields of the FlinkKafkaConsumerBase, without the need to fully execute all operator life cycle methods. This, however, caused the unit tests to be too implementation-specific. This commit reworks the FlinkKafkaConsumerBaseTest to remove test consumer instantiation methods that rely on reflection for dependency injection. All tests now instantiate dummy test consumers normally, and let all tests properly execute all operator life cycle methods regardless of the tested logic. commit ca1aa0074c0ae4ff8e2da4f8d87b8378c2413ef5 Author: Tzu-Li (Gordon) Tai Date: 2018-01-12T00:45:32Z [FLINK-8306] [kafka, tests] Fix mock verifications on final method Previously, offset commit behavioural tests relied on verifying on AbstractFetcher::commitInternalOffsetsToKafka(). That method is actually final, and could not be mocked. This commit fixes that by implementing a proper mock AbstractFetcher, which keeps track of the offset commits that go through. commit d08727c4f8816e408dabc12a673ad24593b27023 Author: Tzu-Li (Gordon) Tai Date: 2017-12-20T19:54:40Z [hotfix] [kafka] Remove stale comment on publishing procedures of AbstractFetcher The previous comment mentioned "only now will the fetcher return at least the restored offsets when calling snapshotCurrentState()". This is a remnant of the previous fetcher initialization behaviour, where in the past the fetcher wasn't directly seeded with restored offsets on instantiation. Since this is no longer true, this commit fixes the stale comment to avoid confusion. > FlinkKafkaConsumerBaseTest has invalid mocks on final methods > - > > Key: FLINK-8306 > URL: https://issues.apache.org/jira/browse/FLINK-8306 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final > {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy > fix would be to
[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods
[ https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323430#comment-16323430 ] ASF GitHub Bot commented on FLINK-8306: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 Subsumed be re-opened PR: #5284 > FlinkKafkaConsumerBaseTest has invalid mocks on final methods > - > > Key: FLINK-8306 > URL: https://issues.apache.org/jira/browse/FLINK-8306 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final > {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy > fix would be to simply make that method non-final, that is not ideal since it > would be best that the method is left final to prevent overrides in > subclasses. > This suggests that offset committing functionality is too tightly coupled > with the {{AbstractFetcher}}, making it hard to perform concise tests to > verify offset committing. > I suggest that we decouple record fetching and offset committing as separate > services behind different interfaces. We should introduce a new interface, > say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we > can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods
[ https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323431#comment-16323431 ] ASF GitHub Bot commented on FLINK-8306: --- Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/5200 > FlinkKafkaConsumerBaseTest has invalid mocks on final methods > - > > Key: FLINK-8306 > URL: https://issues.apache.org/jira/browse/FLINK-8306 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final > {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy > fix would be to simply make that method non-final, that is not ideal since it > would be best that the method is left final to prevent overrides in > subclasses. > This suggests that offset committing functionality is too tightly coupled > with the {{AbstractFetcher}}, making it hard to perform concise tests to > verify offset committing. > I suggest that we decouple record fetching and offset committing as separate > services behind different interfaces. We should introduce a new interface, > say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we > can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/5200 ---
[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 Subsumed be re-opened PR: #5284 ---
[GitHub] flink pull request #5284: [FLINK-8306] [kafka, tests] Fix invalid mock verif...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5284 [FLINK-8306] [kafka, tests] Fix invalid mock verifications on final method ## What is the purpose of the change This is a reworked version of #5200. Instead of introducing a new interface while we are still unsure of how the refactoring of the Kafka consumer should head towards, this version is a simple fix to have proper mocks in the `FlinkKafkaConsumerBase`. Only the last two commits are relevant (PR is based on #5188). ## Brief change log - ca1aa00 Remove invalid mock verifications, and introduce a proper `MockFetcher` mock. - d08727c is a hotfix that corrects a stale comment on a no-longer existing behaviour ## Verifying this change This change is a code cleanup, and is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8306-reworked Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5284.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5284 commit 735972332ff25623898b25f210b7277aafbc7351 Author: Tzu-Li (Gordon) TaiDate: 2017-12-20T00:10:44Z [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java reflection Reflection was mainly used to inject mocks into private fields of the FlinkKafkaConsumerBase, without the need to fully execute all operator life cycle methods. This, however, caused the unit tests to be too implementation-specific. This commit reworks the FlinkKafkaConsumerBaseTest to remove test consumer instantiation methods that rely on reflection for dependency injection. All tests now instantiate dummy test consumers normally, and let all tests properly execute all operator life cycle methods regardless of the tested logic. commit ca1aa0074c0ae4ff8e2da4f8d87b8378c2413ef5 Author: Tzu-Li (Gordon) Tai Date: 2018-01-12T00:45:32Z [FLINK-8306] [kafka, tests] Fix mock verifications on final method Previously, offset commit behavioural tests relied on verifying on AbstractFetcher::commitInternalOffsetsToKafka(). That method is actually final, and could not be mocked. This commit fixes that by implementing a proper mock AbstractFetcher, which keeps track of the offset commits that go through. commit d08727c4f8816e408dabc12a673ad24593b27023 Author: Tzu-Li (Gordon) Tai Date: 2017-12-20T19:54:40Z [hotfix] [kafka] Remove stale comment on publishing procedures of AbstractFetcher The previous comment mentioned "only now will the fetcher return at least the restored offsets when calling snapshotCurrentState()". This is a remnant of the previous fetcher initialization behaviour, where in the past the fetcher wasn't directly seeded with restored offsets on instantiation. Since this is no longer true, this commit fixes the stale comment to avoid confusion. ---
[jira] [Created] (FLINK-8419) Kafka consumer's offset metrics are not registered for dynamically discovered partitions
Tzu-Li (Gordon) Tai created FLINK-8419: -- Summary: Kafka consumer's offset metrics are not registered for dynamically discovered partitions Key: FLINK-8419 URL: https://issues.apache.org/jira/browse/FLINK-8419 Project: Flink Issue Type: Bug Components: Kafka Connector, Metrics Affects Versions: 1.4.0, 1.5.0 Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Priority: Blocker Fix For: 1.5.0, 1.4.1 Currently, the per-partition offset metrics are registered via the {{AbstractFetcher#addOffsetStateGauge}} method. That method is only ever called for the initial startup partitions, and not for dynamically discovered partitions. We should consider adding some unit tests to make sure that metrics are properly registered for all partitions. That would also safeguard us from accidentally removing metrics. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8092) Makes no difference if python script is found or not
[ https://issues.apache.org/jira/browse/FLINK-8092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8092. --- Resolution: Duplicate Fix Version/s: 1.4.0 > Makes no difference if python script is found or not > > > Key: FLINK-8092 > URL: https://issues.apache.org/jira/browse/FLINK-8092 > Project: Flink > Issue Type: Bug > Components: Python API >Affects Versions: 1.3.2 > Environment: The error message is the SAME whether the right path or > not for a python job, as below: > [root@master bin]# ./pyflink.sh /home/al/flink/examples/python/WordCount.py > Cluster configuration: Standalone cluster with JobManager at > localhost/https://www.linkedin.com/redir/invalid-link-page?url=127%2e0%2e0%2e1%3A6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Usage: ./bin/pyflink<2/3>.[sh/bat] [ [ >The program didn't contain a Flink job. Perhaps you forgot to call execute() > on the execution environment. > [root@master bin]# ./pyflink.sh /home/al/flink/examples/python/WordCount.pys > Cluster configuration: Standalone cluster with JobManager at > localhost/https://www.linkedin.com/redir/invalid-link-page?url=127%2e0%2e0%2e1%3A6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Usage: ./bin/pyflink<2/3>.[sh/bat] [ [ > The program didn't contain a Flink job. Perhaps you forgot to call execute() > on the execution environment. > The first link is correct and was tested. The second is a fake. Still, the > error is the SAME. Bug? >Reporter: Al Costa > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (FLINK-8092) Makes no difference if python script is found or not
[ https://issues.apache.org/jira/browse/FLINK-8092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reopened FLINK-8092: - > Makes no difference if python script is found or not > > > Key: FLINK-8092 > URL: https://issues.apache.org/jira/browse/FLINK-8092 > Project: Flink > Issue Type: Bug > Components: Python API >Affects Versions: 1.3.2 > Environment: The error message is the SAME whether the right path or > not for a python job, as below: > [root@master bin]# ./pyflink.sh /home/al/flink/examples/python/WordCount.py > Cluster configuration: Standalone cluster with JobManager at > localhost/https://www.linkedin.com/redir/invalid-link-page?url=127%2e0%2e0%2e1%3A6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Usage: ./bin/pyflink<2/3>.[sh/bat] [ [ >The program didn't contain a Flink job. Perhaps you forgot to call execute() > on the execution environment. > [root@master bin]# ./pyflink.sh /home/al/flink/examples/python/WordCount.pys > Cluster configuration: Standalone cluster with JobManager at > localhost/https://www.linkedin.com/redir/invalid-link-page?url=127%2e0%2e0%2e1%3A6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Usage: ./bin/pyflink<2/3>.[sh/bat] [ [ > The program didn't contain a Flink job. Perhaps you forgot to call execute() > on the execution environment. > The first link is correct and was tested. The second is a fake. Still, the > error is the SAME. Bug? >Reporter: Al Costa > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8092) Makes no difference if python script is found or not
[ https://issues.apache.org/jira/browse/FLINK-8092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8092. --- Resolution: Fixed > Makes no difference if python script is found or not > > > Key: FLINK-8092 > URL: https://issues.apache.org/jira/browse/FLINK-8092 > Project: Flink > Issue Type: Bug > Components: Python API >Affects Versions: 1.3.2 > Environment: The error message is the SAME whether the right path or > not for a python job, as below: > [root@master bin]# ./pyflink.sh /home/al/flink/examples/python/WordCount.py > Cluster configuration: Standalone cluster with JobManager at > localhost/https://www.linkedin.com/redir/invalid-link-page?url=127%2e0%2e0%2e1%3A6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Usage: ./bin/pyflink<2/3>.[sh/bat] [ [ >The program didn't contain a Flink job. Perhaps you forgot to call execute() > on the execution environment. > [root@master bin]# ./pyflink.sh /home/al/flink/examples/python/WordCount.pys > Cluster configuration: Standalone cluster with JobManager at > localhost/https://www.linkedin.com/redir/invalid-link-page?url=127%2e0%2e0%2e1%3A6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Usage: ./bin/pyflink<2/3>.[sh/bat] [ [ > The program didn't contain a Flink job. Perhaps you forgot to call execute() > on the execution environment. > The first link is correct and was tested. The second is a fake. Still, the > error is the SAME. Bug? >Reporter: Al Costa > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8102) Formatting issues in Mesos documentation.
[ https://issues.apache.org/jira/browse/FLINK-8102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8102. --- Resolution: Fixed Fix Version/s: 1.4.0 1.4: 28b3115d23ab6844583578f7f2c7c37316c199be master: cb73078e63e74ca19b37b07b54206c0985cec924 > Formatting issues in Mesos documentation. > -- > > Key: FLINK-8102 > URL: https://issues.apache.org/jira/browse/FLINK-8102 > Project: Flink > Issue Type: Bug >Reporter: Jörg Schad >Assignee: Jörg Schad >Priority: Minor > Fix For: 1.4.0 > > > The Flink documentation renders incorrectly as some characters are not > probably escaped. > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#mesos -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7787) Remove guava dependency in the cassandra connector
[ https://issues.apache.org/jira/browse/FLINK-7787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7787. --- Resolution: Won't Fix > Remove guava dependency in the cassandra connector > -- > > Key: FLINK-7787 > URL: https://issues.apache.org/jira/browse/FLINK-7787 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Haohui Mai >Assignee: Haohui Mai > > As discovered in FLINK-6225, the cassandra connector uses the future classes > in the guava library. We can get rid of the dependency by using the > equivalent classes provided by Java 8. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7927) Different Netty Versions in dependencies of flink-runtime make it impossible to use 3rd party libraries using netty
[ https://issues.apache.org/jira/browse/FLINK-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7927. --- Resolution: Fixed Fix Version/s: 1.4.0 All netty dependencies have been relocated and will no longer cause conflicts. > Different Netty Versions in dependencies of flink-runtime make it impossible > to use 3rd party libraries using netty > --- > > Key: FLINK-7927 > URL: https://issues.apache.org/jira/browse/FLINK-7927 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.2 > Environment: * Windows 10 x64 > * Java 1.8 >Reporter: Claudius Eisele > Fix For: 1.4.0 > > > I am trying to use Google PubSub (google-cloud-pubsub 0.26.0-beta) in a Flink > streaming job but I am receiving the following error when executing it so > unfortunately it's not possible to use PubSub in a Flink Streaming Job: > {code:java} > ... > 10/25/2017 22:38:02 Source: Custom Source -> Map(1/1) switched to RUNNING > 10/25/2017 22:38:03 Source: Custom Source -> Map(1/1) switched to FAILED > java.lang.IllegalStateException: Expected the service InnerService [FAILED] > to be RUNNING, but the service has FAILED > at > com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:328) > at > com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:266) > at > com.google.api.core.AbstractApiService.awaitRunning(AbstractApiService.java:97) > > Caused by: java.lang.IllegalArgumentException: Jetty ALPN/NPN has not been > properly configured. > at > io.grpc.netty.GrpcSslContexts.selectApplicationProtocolConfig(GrpcSslContexts.java:159) > at io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:136) > at io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:124) > at io.grpc.netty.GrpcSslContexts.forClient(GrpcSslContexts.java:94) > at > io.grpc.netty.NettyChannelBuilder$NettyTransportFactory$DefaultNettyTransportCreationParamsFilterFactory.(NettyChannelBuilder.java:525) > at > io.grpc.netty.NettyChannelBuilder$NettyTransportFactory$DefaultNettyTransportCreationParamsFilterFactory.(NettyChannelBuilder.java:518) > at > io.grpc.netty.NettyChannelBuilder$NettyTransportFactory.(NettyChannelBuilder.java:457) > at > io.grpc.netty.NettyChannelBuilder.buildTransportFactory(NettyChannelBuilder.java:326) > at > io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:315) > at > com.google.api.gax.grpc.InstantiatingChannelProvider.createChannel(InstantiatingChannelProvider.java:131) > at > com.google.api.gax.grpc.InstantiatingChannelProvider.getChannel(InstantiatingChannelProvider.java:116) > at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:246) > at > com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:149) > at > com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:211) > at > com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:121) > at > com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:235) > ... 7 more > {code} > I reported this problem to the Google Cloud Java Library but the problem > seems more to be in Flink or its dependencies like akka because there are a > lot of netty dependencies with different versions in it: > * Apache Zookeeper (flink-runtime dependency) has \--- > io.netty:netty:3.7.0.Final -> 3.8.0.Final > * Flakka (flink-runtime dependency) has io.netty:netty:3.8.0.Final > * Flink-Runtime has io.netty:netty-all:4.0.27.Final > In my case, Google Cloud PubSub has io.grpc:grpc-netty:1.6.1 > Additional information on the issue in combination with Google Cloud PubSub > can be found here: > https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2398 > https://github.com/grpc/grpc-java/issues/3025 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7746) move ResultPartitionWriter#writeBufferToAllChannels up into ResultPartition
[ https://issues.apache.org/jira/browse/FLINK-7746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323354#comment-16323354 ] Chesnay Schepler edited comment on FLINK-7746 at 1/12/18 1:18 AM: -- [~NicoK] can this be closed (along with the parent issue)? was (Author: zentol): [~NicoK] can this be closed? > move ResultPartitionWriter#writeBufferToAllChannels up into ResultPartition > --- > > Key: FLINK-7746 > URL: https://issues.apache.org/jira/browse/FLINK-7746 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > As a step towards removing the (unneeded) {{ResultPartitionWriter}} wrapper, > we should move {{ResultPartitionWriter#writeBufferToAllChannels}} into > {{ResultPartition}} where single-channel writing happens anyway. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7746) move ResultPartitionWriter#writeBufferToAllChannels up into ResultPartition
[ https://issues.apache.org/jira/browse/FLINK-7746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323354#comment-16323354 ] Chesnay Schepler commented on FLINK-7746: - [~NicoK] can this be closed? > move ResultPartitionWriter#writeBufferToAllChannels up into ResultPartition > --- > > Key: FLINK-7746 > URL: https://issues.apache.org/jira/browse/FLINK-7746 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > As a step towards removing the (unneeded) {{ResultPartitionWriter}} wrapper, > we should move {{ResultPartitionWriter#writeBufferToAllChannels}} into > {{ResultPartition}} where single-channel writing happens anyway. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7364) Log exceptions from user code in streaming jobs
[ https://issues.apache.org/jira/browse/FLINK-7364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7364. --- Resolution: Cannot Reproduce > Log exceptions from user code in streaming jobs > --- > > Key: FLINK-7364 > URL: https://issues.apache.org/jira/browse/FLINK-7364 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.1 >Reporter: Elias Levy > > Currently, if an exception arises in user supplied code within an operator in > a streaming job, Flink terminates the job, but it fails to record the reason > for the termination. The logs do not record that there was an exception at > all, much less recording the type of exception and where it occurred. This > makes it difficult to debug jobs without implementing exception recording > code on all user supplied operators. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6375) Fix LongValue hashCode
[ https://issues.apache.org/jira/browse/FLINK-6375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-6375: Issue Type: Sub-task (was: Improvement) Parent: FLINK-3957 > Fix LongValue hashCode > -- > > Key: FLINK-6375 > URL: https://issues.apache.org/jira/browse/FLINK-6375 > Project: Flink > Issue Type: Sub-task > Components: Core >Affects Versions: 2.0.0 >Reporter: Greg Hogan >Assignee: Andrew Psaltis >Priority: Trivial > > Match {{LongValue.hashCode}} to {{Long.hashCode}} (and the other numeric > types) by simply adding the high and low words rather than shifting the hash > by adding 43. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5280: Fix typo in AbstractMetricGroup.java
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5280 Thanks, merging this. Do note that you should include the `[hotfix]` and `[doc]` tags to the commit message. All our commit requires proper tags to indicate the issue id (or `hotfix` for typos), and ideally also the fixed component. I'll do it for you while merging this time ;) ---
[jira] [Closed] (FLINK-6234) HTTP/HTTPS WebServer Extension
[ https://issues.apache.org/jira/browse/FLINK-6234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-6234. --- Resolution: Invalid Fix Version/s: (was: 2.0.0) Not enough information was provided. > HTTP/HTTPS WebServer Extension > -- > > Key: FLINK-6234 > URL: https://issues.apache.org/jira/browse/FLINK-6234 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 2.0.0 > Environment: All >Reporter: Abey Sam Alex > Labels: WebFramework > Original Estimate: 1,008h > Remaining Estimate: 1,008h > > All of the existing NIO based Java webserver have their processing locked on > to a single machine and Web Frameworks are locked down to single instances. > Although by nature Every web request is like an endless stream of events > which have multiple pipe within it. In nature the very method of its > functioning is very similar to have Flink works. > Most of the Web Server scaling is limited to adding more servers but rarely > utilizing all of the phsyical cores or pipelining the intermediate steps. > There are other advantages which the flink can bring about by having an > extensions for Web Servers. > The intent is to provide an extension with which once can write an Web > Application using the Flink Framework and take advantage of the streaming > architecture and have easy scalability by adding new nodes at run time. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios
[ https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-6206: Summary: Log task state transitions as warn/error for FAILURE scenarios (was: As an Engineer, I want task state transition log to be warn/error for FAILURE scenarios) > Log task state transitions as warn/error for FAILURE scenarios > -- > > Key: FLINK-6206 > URL: https://issues.apache.org/jira/browse/FLINK-6206 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Dan Bress >Priority: Critical > > If a task fails due to an exception, I would like that to be logged at a warn > or an error level. currently its info > {code} > private boolean transitionState(ExecutionState currentState, ExecutionState > newState, Throwable cause) { > if (STATE_UPDATER.compareAndSet(this, currentState, newState)) { > if (cause == null) { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState); > } else { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState, cause); > } > return true; > } else { > return false; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5283: [hotfix][doc] Fixed doc typo in DataStream API
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5283 Merging .. ---
[jira] [Closed] (FLINK-5848) make Flink Web Backend a little bit more restful
[ https://issues.apache.org/jira/browse/FLINK-5848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-5848. --- Resolution: Later Fix Version/s: 1.5.0 Subsumed by the REST backend rework. > make Flink Web Backend a little bit more restful > > > Key: FLINK-5848 > URL: https://issues.apache.org/jira/browse/FLINK-5848 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.2.0, 1.1.4 >Reporter: Fabian Wollert >Assignee: Fabian Wollert > Fix For: 1.5.0 > > > we are using the web backend for managing flink jobs (cancelling, starting, > etc.). Unfortunately the Backend is not completely RESTful, the responses are > mixed. > E.g. > https://github.com/apache/flink/blob/master/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java > is showing that if a error occurs in the backend, its not resulting in a > HTTP error code and the response is not JSON. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5746) Add acceptEither and applyToEither to Flink's Future
[ https://issues.apache.org/jira/browse/FLINK-5746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-5746. --- Resolution: Won't Fix Our Future class has been replaced with Java8 {{CompletableFuture}}s. > Add acceptEither and applyToEither to Flink's Future > > > Key: FLINK-5746 > URL: https://issues.apache.org/jira/browse/FLINK-5746 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Priority: Minor > > Flink's futures are missing the method {{acceptEither}} and {{applyToEither}} > in order to react to the completion of one of two futures. Adding them would > be helpful. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5691) Creating Reporter for elasticsearch 5.1.X causing conflicts of io.netty library
[ https://issues.apache.org/jira/browse/FLINK-5691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-5691. --- Resolution: Fixed Fix Version/s: 1.4.0 We have relocated our netty dependency to prevent conflicts. > Creating Reporter for elasticsearch 5.1.X causing conflicts of io.netty > library > --- > > Key: FLINK-5691 > URL: https://issues.apache.org/jira/browse/FLINK-5691 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.3.0 > Environment: linux >Reporter: prabhat kumar > Labels: features, maven > Fix For: 1.4.0 > > > Trying to write reporter for elasticsearch 5.1.x using TransportClient which > internally using io.netty version 4.1.6 which has a call to a method which is > not present in flink io.netty verion 4.0.27 causing error > *** > Exception in thread "elasticsearch[_client_][management][T#1]" > java.lang.NoSuchMethodError: > io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf; > at > org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78) > at > org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:449) > at > org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:91) > at > org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:976) > at > org.elasticsearch.transport.TcpTransport.sendRequest(TcpTransport.java:958) > at > org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:520) > at > org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:465) > at > org.elasticsearch.client.transport.TransportClientNodesService$SniffNodesSampler$1.run(TransportClientNodesService.java:482) > at > org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:458) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > *** > trying to upgrade the jar but in pom.xml of flink it's mentioned not to > upgrade else there could be memory issue > > io.netty > netty-all > > 4.0.27.Final > > Please suggest a workaround. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5453) REST API request reference incomplete
[ https://issues.apache.org/jira/browse/FLINK-5453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-5453. --- Resolution: Later Will be fixed in 1.5 as part of FLINK-8133. > REST API request reference incomplete > - > > Key: FLINK-5453 > URL: https://issues.apache.org/jira/browse/FLINK-5453 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.2.0, 1.3.0, 1.4.0 >Reporter: Chesnay Schepler > > The REST API documentation is supposed to contain a list of all available > requests, is however missing all requests related to > * jobmanager > * taskmanagers > * checkpoints > * metrics -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5403) Apache Flink Snapshot 1.2 Javadocs link does not have files like index.html, overview-summary.html or index-all.html
[ https://issues.apache.org/jira/browse/FLINK-5403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-5403. --- Resolution: Cannot Reproduce > Apache Flink Snapshot 1.2 Javadocs link does not have files like index.html, > overview-summary.html or index-all.html > - > > Key: FLINK-5403 > URL: https://issues.apache.org/jira/browse/FLINK-5403 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.4 >Reporter: Markus Dale >Priority: Minor > > The link http://flink.apache.org - Documentation - Snapshot (Development) - > 1.2 Javadocs > (https://ci.apache.org/projects/flink/flink-docs-release-1.2/api/java/) > points to a directory that does not seem to contain an index.html file so the > Javadoc Frame does not get displayed. > The Javadoc html files for individual classes exist (e.g. > https://ci.apache.org/projects/flink/flink-docs-release-1.2/api/java/org/apache/flink/client/cli/CancelOptions.html) > but when clicking from there to Overview or Frames > (https://ci.apache.org/projects/flink/flink-docs-release-1.2/api/java/index.html?org/apache/flink/client/cli/CancelOptions.html) > those resources are not found. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8335) Upgrade hbase connector dependency to 1.4.0
[ https://issues.apache.org/jira/browse/FLINK-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-8335: -- Component/s: Batch Connectors and Input/Output Formats > Upgrade hbase connector dependency to 1.4.0 > --- > > Key: FLINK-8335 > URL: https://issues.apache.org/jira/browse/FLINK-8335 > Project: Flink > Issue Type: Improvement > Components: Batch Connectors and Input/Output Formats >Reporter: Ted Yu >Priority: Minor > > hbase 1.4.0 has been released. > 1.4.0 shows speed improvement over previous 1.x releases. > http://search-hadoop.com/m/HBase/YGbbBxedD1Mnm8t?subj=Re+VOTE+The+second+HBase+1+4+0+release+candidate+RC1+is+available > This issue is to upgrade the dependency to 1.4.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8418) Kafka08ITCase.testStartFromLatest() times out on Travis
Tzu-Li (Gordon) Tai created FLINK-8418: -- Summary: Kafka08ITCase.testStartFromLatest() times out on Travis Key: FLINK-8418 URL: https://issues.apache.org/jira/browse/FLINK-8418 Project: Flink Issue Type: Bug Components: Kafka Connector, Tests Affects Versions: 1.4.0, 1.5.0 Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Priority: Critical Fix For: 1.5.0, 1.4.1 Instance: https://travis-ci.org/kl0u/flink/builds/327733085 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8418) Kafka08ITCase.testStartFromLatestOffsets() times out on Travis
[ https://issues.apache.org/jira/browse/FLINK-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8418: --- Summary: Kafka08ITCase.testStartFromLatestOffsets() times out on Travis (was: Kafka08ITCase.testStartFromLatest() times out on Travis) > Kafka08ITCase.testStartFromLatestOffsets() times out on Travis > -- > > Key: FLINK-8418 > URL: https://issues.apache.org/jira/browse/FLINK-8418 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > Fix For: 1.5.0, 1.4.1 > > > Instance: https://travis-ci.org/kl0u/flink/builds/327733085 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8073) Test instability FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()
[ https://issues.apache.org/jira/browse/FLINK-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323281#comment-16323281 ] Tzu-Li (Gordon) Tai commented on FLINK-8073: Another instance: https://api.travis-ci.org/v3/job/327769316/log.txt > Test instability > FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint() > - > > Key: FLINK-8073 > URL: https://issues.apache.org/jira/browse/FLINK-8073 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Kostas Kloudas >Priority: Critical > Labels: test-stability > > Travis log: https://travis-ci.org/kl0u/flink/jobs/301985988 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8417) Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-8417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8417: --- Description: As discussed in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connectors-With-Temporary-Credentials-td17734.html. Users need the functionality to access cross-account AWS Kinesis streams, using AWS Temporary Credentials [1]. We should add support for {{AWSConfigConstants.CredentialsProvider.STSAssumeRole}}, which internally would use the {{STSAssumeRoleSessionCredentialsProvider}} [2] in {{AWSUtil#getCredentialsProvider(Properties)}}. [1] https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html [2] https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/STSAssumeRoleSessionCredentialsProvider.html was: As discussed in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connectors-With-Temporary-Credentials-td17734.html. Users need the functionality to access cross-account AWS Kinesis streams, using AWS Temporary Credentials [1]. We should add support for {{AWSConfigConstants.CredentialsProvider.STSAssumeRole}}, which internally would use the {{STSAssumeRoleSessionCredentialsProvider}} in {{AWSUtil#getCredentialsProvider(Properties)}}. [1] https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html > Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer > --- > > Key: FLINK-8417 > URL: https://issues.apache.org/jira/browse/FLINK-8417 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.5.0 > > > As discussed in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connectors-With-Temporary-Credentials-td17734.html. > Users need the functionality to access cross-account AWS Kinesis streams, > using AWS Temporary Credentials [1]. > We should add support for > {{AWSConfigConstants.CredentialsProvider.STSAssumeRole}}, which internally > would use the {{STSAssumeRoleSessionCredentialsProvider}} [2] in > {{AWSUtil#getCredentialsProvider(Properties)}}. > [1] https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html > [2] > https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/STSAssumeRoleSessionCredentialsProvider.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8417) Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer
Tzu-Li (Gordon) Tai created FLINK-8417: -- Summary: Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer Key: FLINK-8417 URL: https://issues.apache.org/jira/browse/FLINK-8417 Project: Flink Issue Type: New Feature Components: Kinesis Connector Reporter: Tzu-Li (Gordon) Tai Fix For: 1.5.0 As discussed in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connectors-With-Temporary-Credentials-td17734.html. Users need the functionality to access cross-account AWS Kinesis streams, using AWS Temporary Credentials [1]. We should add support for {{AWSConfigConstants.CredentialsProvider.STSAssumeRole}}, which internally would use the {{STSAssumeRoleSessionCredentialsProvider}} in {{AWSUtil#getCredentialsProvider(Properties)}}. [1] https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8416) Kinesis consumer doc examples should demonstrate preferred default credentials provider
Tzu-Li (Gordon) Tai created FLINK-8416: -- Summary: Kinesis consumer doc examples should demonstrate preferred default credentials provider Key: FLINK-8416 URL: https://issues.apache.org/jira/browse/FLINK-8416 Project: Flink Issue Type: Improvement Components: Documentation, Kinesis Connector Reporter: Tzu-Li (Gordon) Tai Fix For: 1.3.3, 1.5.0, 1.4.1 The Kinesis consumer docs [here](https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kinesis.html#kinesis-consumer) demonstrate providing credentials by explicitly supplying the AWS Access ID and Key. The always preferred approach for AWS, unless running locally, is to automatically fetch the shipped credentials from the AWS environment. That is actually the default behaviour of the Kinesis consumer, so the docs should demonstrate that more clearly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8296) Rework FlinkKafkaConsumerBestTest to not use Java reflection for dependency injection
[ https://issues.apache.org/jira/browse/FLINK-8296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323208#comment-16323208 ] ASF GitHub Bot commented on FLINK-8296: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5188 Thanks! Merging this ... > Rework FlinkKafkaConsumerBestTest to not use Java reflection for dependency > injection > - > > Key: FLINK-8296 > URL: https://issues.apache.org/jira/browse/FLINK-8296 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.5.0, 1.4.1 > > > The current {{FlinkKafkaConsumerBaseTest}} is heavily relying on Java > reflection for dependency injection. Using reflection to compose unit tests > really should be a last resort, and indicates that the tests there are highly > implementation-specific, and that we should make the design more testable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5188: [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5188 Thanks! Merging this ... ---
[jira] [Closed] (FLINK-8333) Split command options from deployment options in CliFrontend
[ https://issues.apache.org/jira/browse/FLINK-8333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8333. Resolution: Fixed Fixed via 12396f19851e74310c9b5f28870a8de9794511fc > Split command options from deployment options in CliFrontend > > > Key: FLINK-8333 > URL: https://issues.apache.org/jira/browse/FLINK-8333 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to better support different {{CustomCommandLines}} we should split > the command and deployment option parsing in the {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8299) Retrieve ExecutionResult by REST polling
[ https://issues.apache.org/jira/browse/FLINK-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323194#comment-16323194 ] ASF GitHub Bot commented on FLINK-8299: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5207 > Retrieve ExecutionResult by REST polling > > > Key: FLINK-8299 > URL: https://issues.apache.org/jira/browse/FLINK-8299 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Retrieve the {{ExecutionResult}} from a finished Flink job via the > {{RestClusterClient}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8333) Split command options from deployment options in CliFrontend
[ https://issues.apache.org/jira/browse/FLINK-8333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323192#comment-16323192 ] ASF GitHub Bot commented on FLINK-8333: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5220 > Split command options from deployment options in CliFrontend > > > Key: FLINK-8333 > URL: https://issues.apache.org/jira/browse/FLINK-8333 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to better support different {{CustomCommandLines}} we should split > the command and deployment option parsing in the {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5225: [FLINK-8339] [flip6] Let CustomCommandLine return ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5225 ---
[jira] [Commented] (FLINK-8338) Make CustomCommandLines non static in CliFrontend
[ https://issues.apache.org/jira/browse/FLINK-8338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323193#comment-16323193 ] ASF GitHub Bot commented on FLINK-8338: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5224 > Make CustomCommandLines non static in CliFrontend > - > > Key: FLINK-8338 > URL: https://issues.apache.org/jira/browse/FLINK-8338 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > For better testability and maintainability we should make the > {{CustomCommandLine}} registration non-static in {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5207 ---
[jira] [Closed] (FLINK-8338) Make CustomCommandLines non static in CliFrontend
[ https://issues.apache.org/jira/browse/FLINK-8338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8338. Resolution: Fixed Fixed via aff43768f3285a5f2bc5593369a7fec3ed77a2af > Make CustomCommandLines non static in CliFrontend > - > > Key: FLINK-8338 > URL: https://issues.apache.org/jira/browse/FLINK-8338 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > For better testability and maintainability we should make the > {{CustomCommandLine}} registration non-static in {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323191#comment-16323191 ] ASF GitHub Bot commented on FLINK-8339: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5225 > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5224: [FLINK-8338] [flip6] Make CustomCommandLines non s...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5224 ---
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5220 ---
[jira] [Closed] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8339. Resolution: Fixed Fixed via e2f1ba92decdb27f3aea4e21a7cad7dcc98cea1a > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8299) Retrieve ExecutionResult by REST polling
[ https://issues.apache.org/jira/browse/FLINK-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8299. -- Resolution: Fixed Fixed via 06922753a55dc322b96919ebb407d531e2b79d3e > Retrieve ExecutionResult by REST polling > > > Key: FLINK-8299 > URL: https://issues.apache.org/jira/browse/FLINK-8299 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Retrieve the {{ExecutionResult}} from a finished Flink job via the > {{RestClusterClient}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8347) Make Cluster id typesafe
[ https://issues.apache.org/jira/browse/FLINK-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323107#comment-16323107 ] ASF GitHub Bot commented on FLINK-8347: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5232#discussion_r161094149 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -367,40 +366,31 @@ public ClusterClient retrieve(String applicationID) { flinkConfiguration, false); } catch (Exception e) { - throw new RuntimeException("Couldn't retrieve Yarn cluster", e); + throw new ClusterRetrieveException("Couldn't retrieve Yarn cluster", e); } } @Override - public YarnClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) { + public YarnClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException { try { return deployInternal( clusterSpecification, getYarnSessionClusterEntrypoint(), null); } catch (Exception e) { - throw new RuntimeException("Couldn't deploy Yarn session cluster", e); + throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e); } } @Override - public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) { + public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) throws ClusterDeploymentException { try { return deployInternal( clusterSpecification, getYarnJobClusterEntrypoint(), jobGraph); } catch (Exception e) { - throw new RuntimeException("Could not deploy Yarn job cluster.", e); - } - } - - @Override --- End diff -- Good catch. Will add it. > Make Cluster id typesafe > > > Key: FLINK-8347 > URL: https://issues.apache.org/jira/browse/FLINK-8347 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the cluster id is of type {{String}}. We should make the id > typesafe to avoid mixups between different {{CustomCommandLines}} and > {{ClusterDescriptors}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5232: [FLINK-8347] [flip6] Make cluster id used by Clust...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5232#discussion_r161094149 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -367,40 +366,31 @@ public ClusterClient retrieve(String applicationID) { flinkConfiguration, false); } catch (Exception e) { - throw new RuntimeException("Couldn't retrieve Yarn cluster", e); + throw new ClusterRetrieveException("Couldn't retrieve Yarn cluster", e); } } @Override - public YarnClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) { + public YarnClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException { try { return deployInternal( clusterSpecification, getYarnSessionClusterEntrypoint(), null); } catch (Exception e) { - throw new RuntimeException("Couldn't deploy Yarn session cluster", e); + throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e); } } @Override - public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) { + public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) throws ClusterDeploymentException { try { return deployInternal( clusterSpecification, getYarnJobClusterEntrypoint(), jobGraph); } catch (Exception e) { - throw new RuntimeException("Could not deploy Yarn job cluster.", e); - } - } - - @Override --- End diff -- Good catch. Will add it. ---
[jira] [Commented] (FLINK-8343) Add support for job cluster deployment
[ https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323104#comment-16323104 ] ASF GitHub Bot commented on FLINK-8343: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5229 Thanks for the review @GJL. Merged onto the latest master. > Add support for job cluster deployment > -- > > Key: FLINK-8343 > URL: https://issues.apache.org/jira/browse/FLINK-8343 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > For Flip-6 we have to enable a different job cluster deployment. The > difference is that we directly submit the job when we deploy the Flink > cluster instead of following a two step approach (first deployment and then > submission). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5229: [FLINK-8343] [flip6] Remove Yarn specific commands from Y...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5229 Thanks for the review @GJL. Merged onto the latest master. ---
[GitHub] flink pull request #5229: [FLINK-8343] [flip6] Remove Yarn specific commands...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5229#discussion_r161089633 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -534,215 +557,235 @@ private Configuration applyYarnProperties(Configuration configuration) throws Fl return effectiveConfiguration; } - public int run( - String[] args, - Configuration configuration, - String configurationDirectory) { + public int run(String[] args) throws CliArgsException, FlinkException { // // Command Line Options // - Options options = new Options(); - addGeneralOptions(options); - addRunOptions(options); + final CommandLine cmd = parseCommandLineOptions(args, true); - CommandLineParser parser = new PosixParser(); - CommandLine cmd; - try { - cmd = parser.parse(options, args); - } catch (Exception e) { - System.out.println(e.getMessage()); - printUsage(); - return 1; - } + final AbstractYarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(cmd); - // Query cluster for metrics - if (cmd.hasOption(query.getOpt())) { - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( - configuration, - configurationDirectory, - cmd.hasOption(flip6.getOpt())); - String description; - try { - description = yarnDescriptor.getClusterDescription(); - } catch (Exception e) { - System.err.println("Error while querying the YARN cluster for available resources: " + e.getMessage()); - e.printStackTrace(System.err); - return 1; - } - System.out.println(description); - return 0; - } else if (cmd.hasOption(applicationId.getOpt())) { + try { + // Query cluster for metrics + if (cmd.hasOption(query.getOpt())) { + final String description = yarnClusterDescriptor.getClusterDescription(); + System.out.println(description); + return 0; + } else { + final ClusterClient clusterClient; + final ApplicationId yarnApplicationId; - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( - configuration, - configurationDirectory, - cmd.hasOption(flip6.getOpt())); + if (cmd.hasOption(applicationId.getOpt())) { + yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt())); - //configure ZK namespace depending on the value passed - String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ? - cmd.getOptionValue(zookeeperNamespace.getOpt()) - : yarnDescriptor.getFlinkConfiguration() - .getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt())); - LOG.info("Going to use the ZK namespace: {}", zkNamespace); - yarnDescriptor.getFlinkConfiguration().setString(HA_CLUSTER_ID, zkNamespace); + clusterClient = yarnClusterDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt())); + } else { + final ClusterSpecification clusterSpecification = getClusterSpecification(cmd); - try { - yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt())); - } catch (Exception e) { - throw new RuntimeException("Could not retrieve existing Yarn application", e); - } + clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification); - if (detachedMode) { -
[jira] [Commented] (FLINK-8343) Add support for job cluster deployment
[ https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323060#comment-16323060 ] ASF GitHub Bot commented on FLINK-8343: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5229#discussion_r161090177 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -534,215 +557,235 @@ private Configuration applyYarnProperties(Configuration configuration) throws Fl return effectiveConfiguration; } - public int run( - String[] args, - Configuration configuration, - String configurationDirectory) { + public int run(String[] args) throws CliArgsException, FlinkException { // // Command Line Options // - Options options = new Options(); - addGeneralOptions(options); - addRunOptions(options); + final CommandLine cmd = parseCommandLineOptions(args, true); - CommandLineParser parser = new PosixParser(); - CommandLine cmd; - try { - cmd = parser.parse(options, args); - } catch (Exception e) { - System.out.println(e.getMessage()); - printUsage(); - return 1; - } + final AbstractYarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(cmd); - // Query cluster for metrics - if (cmd.hasOption(query.getOpt())) { - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( - configuration, - configurationDirectory, - cmd.hasOption(flip6.getOpt())); - String description; - try { - description = yarnDescriptor.getClusterDescription(); - } catch (Exception e) { - System.err.println("Error while querying the YARN cluster for available resources: " + e.getMessage()); - e.printStackTrace(System.err); - return 1; - } - System.out.println(description); - return 0; - } else if (cmd.hasOption(applicationId.getOpt())) { + try { + // Query cluster for metrics + if (cmd.hasOption(query.getOpt())) { + final String description = yarnClusterDescriptor.getClusterDescription(); + System.out.println(description); + return 0; + } else { + final ClusterClient clusterClient; + final ApplicationId yarnApplicationId; - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( - configuration, - configurationDirectory, - cmd.hasOption(flip6.getOpt())); + if (cmd.hasOption(applicationId.getOpt())) { + yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt())); - //configure ZK namespace depending on the value passed - String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ? - cmd.getOptionValue(zookeeperNamespace.getOpt()) - : yarnDescriptor.getFlinkConfiguration() - .getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt())); - LOG.info("Going to use the ZK namespace: {}", zkNamespace); --- End diff -- It should not matter. > Add support for job cluster deployment > -- > > Key: FLINK-8343 > URL: https://issues.apache.org/jira/browse/FLINK-8343 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > For Flip-6 we have to enable a different job cluster deployment. The > difference is that we directly submit the job when we deploy the Flink > cluster instead of following a two step approach (first deployment and
[GitHub] flink pull request #5229: [FLINK-8343] [flip6] Remove Yarn specific commands...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5229#discussion_r161090177 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -534,215 +557,235 @@ private Configuration applyYarnProperties(Configuration configuration) throws Fl return effectiveConfiguration; } - public int run( - String[] args, - Configuration configuration, - String configurationDirectory) { + public int run(String[] args) throws CliArgsException, FlinkException { // // Command Line Options // - Options options = new Options(); - addGeneralOptions(options); - addRunOptions(options); + final CommandLine cmd = parseCommandLineOptions(args, true); - CommandLineParser parser = new PosixParser(); - CommandLine cmd; - try { - cmd = parser.parse(options, args); - } catch (Exception e) { - System.out.println(e.getMessage()); - printUsage(); - return 1; - } + final AbstractYarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(cmd); - // Query cluster for metrics - if (cmd.hasOption(query.getOpt())) { - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( - configuration, - configurationDirectory, - cmd.hasOption(flip6.getOpt())); - String description; - try { - description = yarnDescriptor.getClusterDescription(); - } catch (Exception e) { - System.err.println("Error while querying the YARN cluster for available resources: " + e.getMessage()); - e.printStackTrace(System.err); - return 1; - } - System.out.println(description); - return 0; - } else if (cmd.hasOption(applicationId.getOpt())) { + try { + // Query cluster for metrics + if (cmd.hasOption(query.getOpt())) { + final String description = yarnClusterDescriptor.getClusterDescription(); + System.out.println(description); + return 0; + } else { + final ClusterClient clusterClient; + final ApplicationId yarnApplicationId; - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( - configuration, - configurationDirectory, - cmd.hasOption(flip6.getOpt())); + if (cmd.hasOption(applicationId.getOpt())) { + yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt())); - //configure ZK namespace depending on the value passed - String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ? - cmd.getOptionValue(zookeeperNamespace.getOpt()) - : yarnDescriptor.getFlinkConfiguration() - .getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt())); - LOG.info("Going to use the ZK namespace: {}", zkNamespace); --- End diff -- It should not matter. ---
[jira] [Commented] (FLINK-8343) Add support for job cluster deployment
[ https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323054#comment-16323054 ] ASF GitHub Bot commented on FLINK-8343: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5229#discussion_r161089633 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -534,215 +557,235 @@ private Configuration applyYarnProperties(Configuration configuration) throws Fl return effectiveConfiguration; } - public int run( - String[] args, - Configuration configuration, - String configurationDirectory) { + public int run(String[] args) throws CliArgsException, FlinkException { // // Command Line Options // - Options options = new Options(); - addGeneralOptions(options); - addRunOptions(options); + final CommandLine cmd = parseCommandLineOptions(args, true); - CommandLineParser parser = new PosixParser(); - CommandLine cmd; - try { - cmd = parser.parse(options, args); - } catch (Exception e) { - System.out.println(e.getMessage()); - printUsage(); - return 1; - } + final AbstractYarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(cmd); - // Query cluster for metrics - if (cmd.hasOption(query.getOpt())) { - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( - configuration, - configurationDirectory, - cmd.hasOption(flip6.getOpt())); - String description; - try { - description = yarnDescriptor.getClusterDescription(); - } catch (Exception e) { - System.err.println("Error while querying the YARN cluster for available resources: " + e.getMessage()); - e.printStackTrace(System.err); - return 1; - } - System.out.println(description); - return 0; - } else if (cmd.hasOption(applicationId.getOpt())) { + try { + // Query cluster for metrics + if (cmd.hasOption(query.getOpt())) { + final String description = yarnClusterDescriptor.getClusterDescription(); + System.out.println(description); + return 0; + } else { + final ClusterClient clusterClient; + final ApplicationId yarnApplicationId; - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( - configuration, - configurationDirectory, - cmd.hasOption(flip6.getOpt())); + if (cmd.hasOption(applicationId.getOpt())) { + yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt())); - //configure ZK namespace depending on the value passed - String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ? - cmd.getOptionValue(zookeeperNamespace.getOpt()) - : yarnDescriptor.getFlinkConfiguration() - .getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt())); - LOG.info("Going to use the ZK namespace: {}", zkNamespace); - yarnDescriptor.getFlinkConfiguration().setString(HA_CLUSTER_ID, zkNamespace); + clusterClient = yarnClusterDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt())); + } else { + final ClusterSpecification clusterSpecification = getClusterSpecification(cmd); - try { - yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt())); - } catch (Exception e) { - throw new RuntimeException("Could not retrieve
[GitHub] flink pull request #5228: [FLINK-8342] [flip6] Remove generic type parameter...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5228#discussion_r161087011 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -99,7 +99,7 @@ /** * The descriptor with deployment information for spawning or resuming a {@link YarnClusterClient}. */ -public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor { +public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor { private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class); --- End diff -- Not intended. Will change it. ---
[jira] [Commented] (FLINK-8342) Remove ClusterClient generic type parameter from ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323041#comment-16323041 ] ASF GitHub Bot commented on FLINK-8342: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5228#discussion_r161087011 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -99,7 +99,7 @@ /** * The descriptor with deployment information for spawning or resuming a {@link YarnClusterClient}. */ -public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor { +public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor { private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class); --- End diff -- Not intended. Will change it. > Remove ClusterClient generic type parameter from ClusterDescriptor > -- > > Key: FLINK-8342 > URL: https://issues.apache.org/jira/browse/FLINK-8342 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{ClusterDescriptor}} should not specialize the returned > {{ClusterClient}} type in order to develop code which can work with all > {{ClusterDescriptors}} and {{ClusterClients}}. Therefore, I propose to remove > the generic type parameter from {{ClusterDescriptor}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8341) Remove unneeded CommandLineOptions
[ https://issues.apache.org/jira/browse/FLINK-8341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323034#comment-16323034 ] ASF GitHub Bot commented on FLINK-8341: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5227 Thanks for the review @GJL. Addressing comments and merging once Travis gives green light. > Remove unneeded CommandLineOptions > -- > > Key: FLINK-8341 > URL: https://issues.apache.org/jira/browse/FLINK-8341 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > With the refactorings of the {{CliFrontend}} we no longer have to keep the > JobManager address and commandLine in the {{CommandLineOptions}}. Therefore, > these fields should be removed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5227: [FLINK-8341] [flip6] Remove not needed options from Comma...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5227 Thanks for the review @GJL. Addressing comments and merging once Travis gives green light. ---
[jira] [Commented] (FLINK-8299) Retrieve ExecutionResult by REST polling
[ https://issues.apache.org/jira/browse/FLINK-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322854#comment-16322854 ] ASF GitHub Bot commented on FLINK-8299: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5207 Merging this PR. Thanks for your contribution @GJL. > Retrieve ExecutionResult by REST polling > > > Key: FLINK-8299 > URL: https://issues.apache.org/jira/browse/FLINK-8299 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Retrieve the {{ExecutionResult}} from a finished Flink job via the > {{RestClusterClient}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5207: [FLINK-8299][flip6] Poll JobExecutionResult after job sub...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5207 Merging this PR. Thanks for your contribution @GJL. ---
[jira] [Created] (FLINK-8415) Unprotected access to recordsToSend in LongRecordWriterThread#shutdown()
Ted Yu created FLINK-8415: - Summary: Unprotected access to recordsToSend in LongRecordWriterThread#shutdown() Key: FLINK-8415 URL: https://issues.apache.org/jira/browse/FLINK-8415 Project: Flink Issue Type: Bug Reporter: Ted Yu Priority: Minor {code} public void shutdown() { running = false; recordsToSend.complete(0L); {code} In other methods, access to recordsToSend is protected by synchronized keyword. shutdown() should do the same. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables
[ https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322789#comment-16322789 ] Elias Levy commented on FLINK-7935: --- So it seems the DD reporter needs to switch from {{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} to {{MetricGroup#getMetricIdentifier(String)}}. That would be sufficient for my immediate use case, as I am only looking to add a single user supplied scope/tag. That said, I can see {{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} becoming cumbersome if a user wishes to use multiple key-values/tags. E.g. {code} getRuntimeContext() .getMetricGroup() .addGroup("messages") .addGroup("type", messageType) .addGroup("source", messageSource) .addGroup("priority", messagePriority) .counter("count") {code} would be named {{.messages.type.source.priority.count}} instead of just {{.messages.count}} with variables/tags {{type}}, {{source}}, and {{priority}}. > Metrics with user supplied scope variables > -- > > Key: FLINK-7935 > URL: https://issues.apache.org/jira/browse/FLINK-7935 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.2 >Reporter: Elias Levy > > We use DataDog for metrics. DD and Flink differ somewhat in how they track > metrics. > Flink names and scopes metrics together, at least by default. E.g. by default > the System scope for operator metrics is > {{.taskmanager}}. > The scope variables become part of the metric's full name. > In DD the metric would be named something generic, e.g. > {{taskmanager.job.operator}}, and they would be distinguished by their tag > values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}. > Flink allows you to configure the format string for system scopes, so it is > possible to set the operator scope format to {{taskmanager.job.operator}}. > We do this for all scopes: > {code} > metrics.scope.jm: jobmanager > metrics.scope.jm.job: jobmanager.job > metrics.scope.tm: taskmanager > metrics.scope.tm.job: taskmanager.job > metrics.scope.task: taskmanager.job.task > metrics.scope.operator: taskmanager.job.operator > {code} > This seems to work. The DataDog Flink metric's plugin submits all scope > variables as tags, even if they are not used within the scope format. And it > appears internally this does not lead to metrics conflicting with each other. > We would like to extend this to user defined metrics, but you can define > variables/scopes when adding a metric group or metric with the user API, so > that in DD we have a single metric with a tag with many different values, > rather than hundreds of metrics to just the one value we want to measure > across different event types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8299) Retrieve ExecutionResult by REST polling
[ https://issues.apache.org/jira/browse/FLINK-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322541#comment-16322541 ] ASF GitHub Bot commented on FLINK-8299: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r161010273 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { } } + private JobExecutionResult waitForJobExecutionResult( + final JobID jobId) throws ProgramInvocationException { + + final JobMessageParameters messageParameters = new JobMessageParameters(); + messageParameters.jobPathParameter.resolve(jobId); + JobExecutionResultResponseBody jobExecutionResultResponseBody; + try { + long attempt = 0; + do { + final CompletableFuture responseFuture = + restClient.sendRequest( + restClusterClientConfiguration.getRestServerAddress(), + restClusterClientConfiguration.getRestServerPort(), + JobExecutionResultHeaders.getInstance(), + messageParameters); + jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + Thread.sleep(waitStrategy.sleepTime(attempt)); + attempt++; + } + while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED); --- End diff -- Alright, then let's do it there. > Retrieve ExecutionResult by REST polling > > > Key: FLINK-8299 > URL: https://issues.apache.org/jira/browse/FLINK-8299 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Retrieve the {{ExecutionResult}} from a finished Flink job via the > {{RestClusterClient}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r161010273 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { } } + private JobExecutionResult waitForJobExecutionResult( + final JobID jobId) throws ProgramInvocationException { + + final JobMessageParameters messageParameters = new JobMessageParameters(); + messageParameters.jobPathParameter.resolve(jobId); + JobExecutionResultResponseBody jobExecutionResultResponseBody; + try { + long attempt = 0; + do { + final CompletableFuture responseFuture = + restClient.sendRequest( + restClusterClientConfiguration.getRestServerAddress(), + restClusterClientConfiguration.getRestServerPort(), + JobExecutionResultHeaders.getInstance(), + messageParameters); + jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + Thread.sleep(waitStrategy.sleepTime(attempt)); + attempt++; + } + while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED); --- End diff -- Alright, then let's do it there. ---
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5194 ---
[jira] [Commented] (FLINK-8233) Expose JobExecutionResult via HTTP
[ https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322480#comment-16322480 ] ASF GitHub Bot commented on FLINK-8233: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5194 > Expose JobExecutionResult via HTTP > -- > > Key: FLINK-8233 > URL: https://issues.apache.org/jira/browse/FLINK-8233 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Expose {{JobExecutionResult}} from a finished Flink job via HTTP: > * Add a new AbstractRestHandler that returns the information in > {{JobExecutionResult}}. > * Register new handler in {{WebMonitorEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322477#comment-16322477 ] ASF GitHub Bot commented on FLINK-8328: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5215 > Pull Yarn ApplicationStatus polling out of YarnClusterClient > > > Key: FLINK-8328 > URL: https://issues.apache.org/jira/browse/FLINK-8328 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to > pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. > I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has > also the benefit of separating concerns better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8329) Move YarnClient out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8329. Resolution: Fixed Fixed via 156b8935ef76eb53456cea1d40fd528ccefa21d8 > Move YarnClient out of YarnClusterClient > > > Key: FLINK-8329 > URL: https://issues.apache.org/jira/browse/FLINK-8329 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Move the {{YarnClient}} from the {{YarnClusterClient}} to the > {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle > management of the {{YarnClient}}. This change is a clean up task which will > better structure the client code. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8329) Move YarnClient out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322478#comment-16322478 ] ASF GitHub Bot commented on FLINK-8329: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5216 > Move YarnClient out of YarnClusterClient > > > Key: FLINK-8329 > URL: https://issues.apache.org/jira/browse/FLINK-8329 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Move the {{YarnClient}} from the {{YarnClusterClient}} to the > {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle > management of the {{YarnClient}}. This change is a clean up task which will > better structure the client code. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8328. Resolution: Fixed Fixed via 2ce5b98da04cb3850ff91757cc4b74a98b8ce082 > Pull Yarn ApplicationStatus polling out of YarnClusterClient > > > Key: FLINK-8328 > URL: https://issues.apache.org/jira/browse/FLINK-8328 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to > pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. > I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has > also the benefit of separating concerns better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5216: [FLINK-8329] [flip6] Move YarnClient to AbstractYa...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5216 ---
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5219 ---
[jira] [Closed] (FLINK-8332) Move dispose savepoint into ClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8332. Resolution: Fixed Fixed via c2492e9b220c6c9a64b47bcdc76a2194d9f4d669 > Move dispose savepoint into ClusterClient > - > > Key: FLINK-8332 > URL: https://issues.apache.org/jira/browse/FLINK-8332 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the {{CliFrontend}} sends the command for disposing a savepoint. > In order to better abstract this functionality we should move it to the > {{ClusterClient}}. That way we can have different implementations of the > {{ClusterClient}} (Flip-6 and old code) which are used by the same > {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8332) Move dispose savepoint into ClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322479#comment-16322479 ] ASF GitHub Bot commented on FLINK-8332: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5219 > Move dispose savepoint into ClusterClient > - > > Key: FLINK-8332 > URL: https://issues.apache.org/jira/browse/FLINK-8332 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the {{CliFrontend}} sends the command for disposing a savepoint. > In order to better abstract this functionality we should move it to the > {{ClusterClient}}. That way we can have different implementations of the > {{ClusterClient}} (Flip-6 and old code) which are used by the same > {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5215 ---
[jira] [Closed] (FLINK-8233) Expose JobExecutionResult via HTTP
[ https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8233. Resolution: Fixed Fixed via 86892b8e76a4e4b26cedf38c0695c53814a7f04f > Expose JobExecutionResult via HTTP > -- > > Key: FLINK-8233 > URL: https://issues.apache.org/jira/browse/FLINK-8233 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Expose {{JobExecutionResult}} from a finished Flink job via HTTP: > * Add a new AbstractRestHandler that returns the information in > {{JobExecutionResult}}. > * Register new handler in {{WebMonitorEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r160997382 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { } } + private JobExecutionResult waitForJobExecutionResult( + final JobID jobId) throws ProgramInvocationException { + + final JobMessageParameters messageParameters = new JobMessageParameters(); + messageParameters.jobPathParameter.resolve(jobId); + JobExecutionResultResponseBody jobExecutionResultResponseBody; + try { + long attempt = 0; + do { + final CompletableFuture responseFuture = + restClient.sendRequest( + restClusterClientConfiguration.getRestServerAddress(), + restClusterClientConfiguration.getRestServerPort(), + JobExecutionResultHeaders.getInstance(), + messageParameters); + jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + Thread.sleep(waitStrategy.sleepTime(attempt)); + attempt++; + } + while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED); --- End diff -- Can change it to `getStatus().getId()` to avoid redundancy. However, the code will be touched once more in #5223. I can do it there. ---
[jira] [Commented] (FLINK-8299) Retrieve ExecutionResult by REST polling
[ https://issues.apache.org/jira/browse/FLINK-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322467#comment-16322467 ] ASF GitHub Bot commented on FLINK-8299: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r160997382 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { } } + private JobExecutionResult waitForJobExecutionResult( + final JobID jobId) throws ProgramInvocationException { + + final JobMessageParameters messageParameters = new JobMessageParameters(); + messageParameters.jobPathParameter.resolve(jobId); + JobExecutionResultResponseBody jobExecutionResultResponseBody; + try { + long attempt = 0; + do { + final CompletableFuture responseFuture = + restClient.sendRequest( + restClusterClientConfiguration.getRestServerAddress(), + restClusterClientConfiguration.getRestServerPort(), + JobExecutionResultHeaders.getInstance(), + messageParameters); + jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + Thread.sleep(waitStrategy.sleepTime(attempt)); + attempt++; + } + while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED); --- End diff -- Can change it to `getStatus().getId()` to avoid redundancy. However, the code will be touched once more in #5223. I can do it there. > Retrieve ExecutionResult by REST polling > > > Key: FLINK-8299 > URL: https://issues.apache.org/jira/browse/FLINK-8299 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Retrieve the {{ExecutionResult}} from a finished Flink job via the > {{RestClusterClient}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5225: [FLINK-8339] [flip6] Let CustomCommandLine return ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160995333 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java --- @@ -51,45 +45,23 @@ public String getId() { } @Override - public void addRunOptions(Options baseOptions) { - } + public ClusterDescriptor createClusterDescriptor( + Configuration configuration, + String configurationDirectory, + CommandLine commandLine) { + final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(configuration, commandLine); - @Override - public void addGeneralOptions(Options baseOptions) { + return new StandaloneClusterDescriptor(effectiveConfiguration); } @Override - public StandaloneClusterClient retrieveCluster( - CommandLine commandLine, - Configuration config, - String configurationDirectory) { - - if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) { - String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt()); - InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort); - setJobManagerAddressInConfig(config, jobManagerAddress); - } - - if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) { - String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt()); - config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); - } - - StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); - return descriptor.retrieve(null); + @Nullable + public String getClusterId(Configuration configuration, CommandLine commandLine) { + return "standalone"; } @Override - public StandaloneClusterClient createCluster( - String applicationName, - CommandLine commandLine, - Configuration config, - String configurationDirectory, - List userJarFiles) throws UnsupportedOperationException { - - StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); - ClusterSpecification clusterSpecification = ClusterSpecification.fromConfiguration(config); - - return descriptor.deploySessionCluster(clusterSpecification); + public ClusterSpecification getClusterSpecification(Configuration configuration, CommandLine commandLine) { + return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); --- End diff -- ok ---
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322459#comment-16322459 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160995333 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java --- @@ -51,45 +45,23 @@ public String getId() { } @Override - public void addRunOptions(Options baseOptions) { - } + public ClusterDescriptor createClusterDescriptor( + Configuration configuration, + String configurationDirectory, + CommandLine commandLine) { + final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(configuration, commandLine); - @Override - public void addGeneralOptions(Options baseOptions) { + return new StandaloneClusterDescriptor(effectiveConfiguration); } @Override - public StandaloneClusterClient retrieveCluster( - CommandLine commandLine, - Configuration config, - String configurationDirectory) { - - if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) { - String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt()); - InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort); - setJobManagerAddressInConfig(config, jobManagerAddress); - } - - if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) { - String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt()); - config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); - } - - StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); - return descriptor.retrieve(null); + @Nullable + public String getClusterId(Configuration configuration, CommandLine commandLine) { + return "standalone"; } @Override - public StandaloneClusterClient createCluster( - String applicationName, - CommandLine commandLine, - Configuration config, - String configurationDirectory, - List userJarFiles) throws UnsupportedOperationException { - - StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); - ClusterSpecification clusterSpecification = ClusterSpecification.fromConfiguration(config); - - return descriptor.deploySessionCluster(clusterSpecification); + public ClusterSpecification getClusterSpecification(Configuration configuration, CommandLine commandLine) { + return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); --- End diff -- ok > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8233) Expose JobExecutionResult via HTTP
[ https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322452#comment-16322452 ] ASF GitHub Bot commented on FLINK-8233: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5194 Travis passed. Merging this PR. Thanks for your contribution. > Expose JobExecutionResult via HTTP > -- > > Key: FLINK-8233 > URL: https://issues.apache.org/jira/browse/FLINK-8233 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Expose {{JobExecutionResult}} from a finished Flink job via HTTP: > * Add a new AbstractRestHandler that returns the information in > {{JobExecutionResult}}. > * Register new handler in {{WebMonitorEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322453#comment-16322453 ] ASF GitHub Bot commented on FLINK-8339: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160994352 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception { final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); - final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, configuration, configurationDirectory); + final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor( --- End diff -- ok > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5225: [FLINK-8339] [flip6] Let CustomCommandLine return ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160994352 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception { final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); - final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, configuration, configurationDirectory); + final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor( --- End diff -- ok ---
[GitHub] flink issue #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5194 Travis passed. Merging this PR. Thanks for your contribution. ---
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322425#comment-16322425 ] ASF GitHub Bot commented on FLINK-8339: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5225 Thanks for the review @GJL. I've addressed most of your comments and rebased onto the latest master. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5225: [FLINK-8339] [flip6] Let CustomCommandLine return Cluster...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5225 Thanks for the review @GJL. I've addressed most of your comments and rebased onto the latest master. ---
[jira] [Commented] (FLINK-6892) Add L/RPAD supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322397#comment-16322397 ] ASF GitHub Bot commented on FLINK-6892: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4127 @twalthr Thanks for the review! I have update the PR. I will be very grateful if you can review again. Thanks, Jincheng > Add L/RPAD supported in SQL > --- > > Key: FLINK-6892 > URL: https://issues.apache.org/jira/browse/FLINK-6892 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > L/RPAD(str,len,padstr) Returns the string str, left/right-padded with the > string padstr to a length of len characters. If str is longer than len, the > return value is shortened to len characters. > * Syntax: > LPAD(str,len,padstr) > * Arguments > **str: - > **len: - > **padstr: - > * Return Types > String > * Example: > LPAD('hi',4,'??') -> '??hi' > LPAD('hi',1,'??') -> 'h' > RPAD('hi',4,'??') -> 'hi??' > RPAD('hi',1,'??') -> 'h' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_lpad] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4127: [FLINK-6892][table]Add L/RPAD supported in SQL
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4127 @twalthr Thanks for the review! I have update the PR. I will be very grateful if you can review again. Thanks, Jincheng ---
[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor
[ https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322396#comment-16322396 ] ASF GitHub Bot commented on FLINK-8339: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160985701 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception { final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); - final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, configuration, configurationDirectory); + final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor( --- End diff -- You're right. I would like to postpone addressing this issue until we've introduce the typed cluster id. Otherwise I fear that it would inflict quite some merge conflicts. > Let CustomCommandLine return a ClusterDescriptor > > > Key: FLINK-8339 > URL: https://issues.apache.org/jira/browse/FLINK-8339 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} > and deploy a cluster. In order to better separate concerns it would be good > if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} > which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink > cluster. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5225: [FLINK-8339] [flip6] Let CustomCommandLine return ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160985701 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception { final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine); - final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, configuration, configurationDirectory); + final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor( --- End diff -- You're right. I would like to postpone addressing this issue until we've introduce the typed cluster id. Otherwise I fear that it would inflict quite some merge conflicts. ---