[jira] [Commented] (FLINK-8413) Snapshot state of aggregated data is not maintained in flink's checkpointing

2018-01-11 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323645#comment-16323645
 ] 

Fabian Hueske commented on FLINK-8413:
--

This might also an issue with Beam's Flink runner. In this case, the issue 
should be reported to Beam's JIRA.
[~aljoscha] might be able to help here.

> Snapshot state of aggregated data is not maintained in flink's checkpointing
> 
>
> Key: FLINK-8413
> URL: https://issues.apache.org/jira/browse/FLINK-8413
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: suganya
>
> We have a project which consumes events from kafka,does a groupby in a time 
> window(5 mins),after window elapses it pushes the events to downstream for 
> merge.This project is deployed using flink ,we have enabled checkpointing to 
> recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka  get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration (State TTL)

2018-01-11 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323611#comment-16323611
 ] 

Bowen Li commented on FLINK-3089:
-

[~xfournet] yes, supporting only TTL in processing time would be another 
important assumption I'd like to make!

Implementing TTL in HeapStateBackend can be a bit tricky because of the number 
of timers as you mentioned. W.r.t. development plan, I'm think we can probably 
add the interface and support TTL in RocksDBStateBackend first. Then we can 
decide 1) whether making it a RocksDBStateBackend-only feature, like 
incremental checkpointing 2) if we should support TTL in HeapStateBackend, how 
to implement it

What do you think?

cc [~srichter] [~aljoscha]

> State API Should Support Data Expiration (State TTL)
> 
>
> Key: FLINK-3089
> URL: https://issues.apache.org/jira/browse/FLINK-3089
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Niels Basjes
>Assignee: Bowen Li
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific state which I 
> can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-8413) Snapshot state of aggregated data is not maintained in flink's checkpointing

2018-01-11 Thread suganya (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323546#comment-16323546
 ] 

suganya edited comment on FLINK-8413 at 1/12/18 5:09 AM:
-

The following code is using apache beam libraries to create a pipeline.Please 
find the code.

 public void run(String[] args) {
  BeamCLIOptions beamCliOptions = 
PipelineOptionsFactory.fromArgs(args).withValidation()
  .as(BeamCLIOptions.class);
  Pipeline pipeline = Pipeline.create(beamCliOptions);
  MergeDistribution mergeDistribution = MergeDistribution
  .valueOf(beamCliOptions.getMergeDistribution());
  MergeDistribution fixedWindowDuration = MergeDistribution
  .valueOf(beamCliOptions.getFixedWindowSize());
  KafkaIO.Read kafkaEntityStreamReader = KafkaIO.read()
  .withBootstrapServers(beamCliOptions.getKafkaServers())
  .withTopic(beamCliOptions.getKafkaTopic())
  .withKeyDeserializer(StringDeserializer.class)
  .withValueDeserializer(StringDeserializer.class)
  .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
"latest","enable.auto.commit","true"));

  pipeline.apply(kafkaEntityStreamReader.withoutMetadata())
  .apply(Values.create())
  .apply(Window.into(
  
FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins(
  
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
  
.plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins()
  .discardingFiredPanes()
  .withAllowedLateness(Duration.ZERO))
  .apply(ParDo.of(new ExtractDataFn(
  beamCliOptions.getDatePartitionKey(),
  new 
DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis(
  .apply("Applying GroupByKey on -MM-DD HH ", GroupByKey.create())
  .apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions)));

  pipeline.run();
}


was (Author: suganyap):
The following code is using apache beam libraries to create a pipeline.Please 
find the code.

 public void run(String[] args) {
  BeamCLIOptions beamCliOptions = 
PipelineOptionsFactory.fromArgs(args).withValidation()
  .as(BeamCLIOptions.class);
  Pipeline pipeline = Pipeline.create(beamCliOptions);
  MergeDistribution mergeDistribution = MergeDistribution
  .valueOf(beamCliOptions.getMergeDistribution());
  MergeDistribution fixedWindowDuration = MergeDistribution
  .valueOf(beamCliOptions.getFixedWindowSize());
  KafkaIO.Read kafkaEntityStreamReader = KafkaIO.read()
  .withBootstrapServers(beamCliOptions.getKafkaServers())
  .withTopic(beamCliOptions.getKafkaTopic())
  .withKeyDeserializer(StringDeserializer.class)
  .withValueDeserializer(StringDeserializer.class)
  .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
"latest","enable.auto.commit","true"));

  pipeline.apply(kafkaEntityStreamReader.withoutMetadata())
  .apply(Values.create())
  .apply(Window.into(
  
FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins(
  
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
  
.plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins()
  .discardingFiredPanes()
  .withAllowedLateness(Duration.ZERO))
  .apply(ParDo.of(new ExtractDataFn(
  beamCliOptions.getDatePartitionKey(),
  new 
DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis(
  .apply("Applying GroupByKey on -MM-DD HH ", GroupByKey.create())
  .apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions)));

  pipeline.run();
}

> Snapshot state of aggregated data is not maintained in flink's checkpointing
> 
>
> Key: FLINK-8413
> URL: https://issues.apache.org/jira/browse/FLINK-8413
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: suganya
>
> We have a project which consumes events from kafka,does a groupby in a time 
> window(5 mins),after window elapses it pushes the events to downstream for 
> merge.This project is deployed using flink ,we have enabled checkpointing to 
> recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka  get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any 

[jira] [Commented] (FLINK-8413) Snapshot state of aggregated data is not maintained in flink's checkpointing

2018-01-11 Thread suganya (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323546#comment-16323546
 ] 

suganya commented on FLINK-8413:


The following code is using apache beam libraries to create a pipeline.Please 
find the code.

 public void run(String[] args) {
  BeamCLIOptions beamCliOptions = 
PipelineOptionsFactory.fromArgs(args).withValidation()
  .as(BeamCLIOptions.class);
  Pipeline pipeline = Pipeline.create(beamCliOptions);
  MergeDistribution mergeDistribution = MergeDistribution
  .valueOf(beamCliOptions.getMergeDistribution());
  MergeDistribution fixedWindowDuration = MergeDistribution
  .valueOf(beamCliOptions.getFixedWindowSize());
  KafkaIO.Read kafkaEntityStreamReader = KafkaIO.read()
  .withBootstrapServers(beamCliOptions.getKafkaServers())
  .withTopic(beamCliOptions.getKafkaTopic())
  .withKeyDeserializer(StringDeserializer.class)
  .withValueDeserializer(StringDeserializer.class)
  .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
"latest","enable.auto.commit","true"));

  pipeline.apply(kafkaEntityStreamReader.withoutMetadata())
  .apply(Values.create())
  .apply(Window.into(
  
FixedWindows.of(Duration.standardMinutes(fixedWindowDuration.getMins(
  
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
  
.plusDelayOf(Duration.standardMinutes(mergeDistribution.getMins()
  .discardingFiredPanes()
  .withAllowedLateness(Duration.ZERO))
  .apply(ParDo.of(new ExtractDataFn(
  beamCliOptions.getDatePartitionKey(),
  new 
DateTime().minusDays(beamCliOptions.getDaysAgo()).getMillis(
  .apply("Applying GroupByKey on -MM-DD HH ", GroupByKey.create())
  .apply("Applying Merge ", ParDo.of(new MergeDataFn(beamCliOptions)));

  pipeline.run();
}

> Snapshot state of aggregated data is not maintained in flink's checkpointing
> 
>
> Key: FLINK-8413
> URL: https://issues.apache.org/jira/browse/FLINK-8413
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: suganya
>
> We have a project which consumes events from kafka,does a groupby in a time 
> window(5 mins),after window elapses it pushes the events to downstream for 
> merge.This project is deployed using flink ,we have enabled checkpointing to 
> recover from failed state.
> (windowsize: 5mins , checkpointingInterval: 5mins,state.backend: filesystem)
> Offsets from kafka  get checkpointed every 5 
> mins(checkpointingInterval).Before finishing the entire DAG(groupBy and 
> merge) , events offsets are getting checkpointed.So incase of any restart 
> from task-manager ,new task gets started from last successful checkpoint ,but 
> we could'nt able to get the aggregated snapshot data(data from groupBy task) 
> from the persisted checkpoint.
> Able to retrieve the last successful checkpointed offset from kafka ,but 
> couldnt able to get last aggregated data till checkpointing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5280: [hotfix] Fix typo in AbstractMetricGroup.java

2018-01-11 Thread maqingxiang
Github user maqingxiang commented on the issue:

https://github.com/apache/flink/pull/5280
  
Done, sorry.


---


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323447#comment-16323447
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5182
  
Thanks @casidiablo. Merging ...


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' millisBehi...

2018-01-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5182
  
Thanks @casidiablo. Merging ...


---


[jira] [Commented] (FLINK-3296) DataStream.write*() methods are not flushing properly

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323442#comment-16323442
 ] 

ASF GitHub Bot commented on FLINK-3296:
---

Github user coveralls commented on the issue:

https://github.com/apache/flink/pull/1563
  

[![Coverage 
Status](https://coveralls.io/builds/15015307/badge)](https://coveralls.io/builds/15015307)

Changes Unknown when pulling **df49d5bb8ba778cdd17b94318c4bf48c6d1747ad on 
rmetzger:flink3296** into ** on apache:master**.



> DataStream.write*() methods are not flushing properly
> -
>
> Key: FLINK-3296
> URL: https://issues.apache.org/jira/browse/FLINK-3296
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.0.0
>
>
> The DataStream.write() methods rely on the {{FileSinkFunctionByMillis}} 
> class, which has a logic for flushing records, even though the underlying 
> stream is never flushed. This is misleading for users as files are not 
> written as they would expect it.
> The code was initial written with FileOutputFormats in mind, but the types 
> were not set correctly. This PR opened the write() method to any output 
> format: https://github.com/apache/flink/pull/706/files



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #1563: [FLINK-3296] Remove 'flushing' behavior of the OutputForm...

2018-01-11 Thread coveralls
Github user coveralls commented on the issue:

https://github.com/apache/flink/pull/1563
  

[![Coverage 
Status](https://coveralls.io/builds/15015307/badge)](https://coveralls.io/builds/15015307)

Changes Unknown when pulling **df49d5bb8ba778cdd17b94318c4bf48c6d1747ad on 
rmetzger:flink3296** into ** on apache:master**.



---


[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323429#comment-16323429
 ] 

ASF GitHub Bot commented on FLINK-8306:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5284

[FLINK-8306] [kafka, tests] Fix invalid mock verifications on final method

## What is the purpose of the change

This is a reworked version of #5200.
Instead of introducing a new interface while we are still unsure of how the 
refactoring of the Kafka consumer should head towards, this version is a simple 
fix to have proper mocks in the `FlinkKafkaConsumerBase`.

Only the last two commits are relevant (PR is based on #5188).

## Brief change log

- ca1aa00 Remove invalid mock verifications, and introduce a proper 
`MockFetcher` mock.
- d08727c is a hotfix that corrects a stale comment on a no-longer existing 
behaviour

## Verifying this change

This change is a code cleanup, and is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8306-reworked

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5284.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5284


commit 735972332ff25623898b25f210b7277aafbc7351
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-20T00:10:44Z

[FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java 
reflection

Reflection was mainly used to inject mocks into private fields of the
FlinkKafkaConsumerBase, without the need to fully execute all operator
life cycle methods. This, however, caused the unit tests to be too
implementation-specific.

This commit reworks the FlinkKafkaConsumerBaseTest to remove test
consumer instantiation methods that rely on reflection for dependency
injection. All tests now instantiate dummy test consumers normally, and
let all tests properly execute all operator life cycle methods
regardless of the tested logic.

commit ca1aa0074c0ae4ff8e2da4f8d87b8378c2413ef5
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-12T00:45:32Z

[FLINK-8306] [kafka, tests] Fix mock verifications on final method

Previously, offset commit behavioural tests relied on verifying on
AbstractFetcher::commitInternalOffsetsToKafka(). That method is actually
final, and could not be mocked.

This commit fixes that by implementing a proper mock AbstractFetcher,
which keeps track of the offset commits that go through.

commit d08727c4f8816e408dabc12a673ad24593b27023
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-20T19:54:40Z

[hotfix] [kafka] Remove stale comment on publishing procedures of 
AbstractFetcher

The previous comment mentioned "only now will the fetcher return at
least the restored offsets when calling snapshotCurrentState()". This is
a remnant of the previous fetcher initialization behaviour, where in the
past the fetcher wasn't directly seeded with restored offsets on
instantiation.

Since this is no longer true, this commit fixes the stale comment to
avoid confusion.




> FlinkKafkaConsumerBaseTest has invalid mocks on final methods
> -
>
> Key: FLINK-8306
> URL: https://issues.apache.org/jira/browse/FLINK-8306
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final 
> {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy 
> fix would be to 

[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323430#comment-16323430
 ] 

ASF GitHub Bot commented on FLINK-8306:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5200
  
Subsumed be re-opened PR: #5284 


> FlinkKafkaConsumerBaseTest has invalid mocks on final methods
> -
>
> Key: FLINK-8306
> URL: https://issues.apache.org/jira/browse/FLINK-8306
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final 
> {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy 
> fix would be to simply make that method non-final, that is not ideal since it 
> would be best that the method is left final to prevent overrides in 
> subclasses.
> This suggests that offset committing functionality is too tightly coupled 
> with the {{AbstractFetcher}}, making it hard to perform concise tests to 
> verify offset committing.
> I suggest that we decouple record fetching and offset committing as separate 
> services behind different interfaces. We should introduce a new interface, 
> say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we 
> can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323431#comment-16323431
 ] 

ASF GitHub Bot commented on FLINK-8306:
---

Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/5200


> FlinkKafkaConsumerBaseTest has invalid mocks on final methods
> -
>
> Key: FLINK-8306
> URL: https://issues.apache.org/jira/browse/FLINK-8306
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final 
> {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy 
> fix would be to simply make that method non-final, that is not ideal since it 
> would be best that the method is left final to prevent overrides in 
> subclasses.
> This suggests that offset committing functionality is too tightly coupled 
> with the {{AbstractFetcher}}, making it hard to perform concise tests to 
> verify offset committing.
> I suggest that we decouple record fetching and offset committing as separate 
> services behind different interfaces. We should introduce a new interface, 
> say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we 
> can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-11 Thread tzulitai
Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/5200


---


[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5200
  
Subsumed be re-opened PR: #5284 


---


[GitHub] flink pull request #5284: [FLINK-8306] [kafka, tests] Fix invalid mock verif...

2018-01-11 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5284

[FLINK-8306] [kafka, tests] Fix invalid mock verifications on final method

## What is the purpose of the change

This is a reworked version of #5200.
Instead of introducing a new interface while we are still unsure of how the 
refactoring of the Kafka consumer should head towards, this version is a simple 
fix to have proper mocks in the `FlinkKafkaConsumerBase`.

Only the last two commits are relevant (PR is based on #5188).

## Brief change log

- ca1aa00 Remove invalid mock verifications, and introduce a proper 
`MockFetcher` mock.
- d08727c is a hotfix that corrects a stale comment on a no-longer existing 
behaviour

## Verifying this change

This change is a code cleanup, and is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8306-reworked

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5284.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5284


commit 735972332ff25623898b25f210b7277aafbc7351
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-20T00:10:44Z

[FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java 
reflection

Reflection was mainly used to inject mocks into private fields of the
FlinkKafkaConsumerBase, without the need to fully execute all operator
life cycle methods. This, however, caused the unit tests to be too
implementation-specific.

This commit reworks the FlinkKafkaConsumerBaseTest to remove test
consumer instantiation methods that rely on reflection for dependency
injection. All tests now instantiate dummy test consumers normally, and
let all tests properly execute all operator life cycle methods
regardless of the tested logic.

commit ca1aa0074c0ae4ff8e2da4f8d87b8378c2413ef5
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-12T00:45:32Z

[FLINK-8306] [kafka, tests] Fix mock verifications on final method

Previously, offset commit behavioural tests relied on verifying on
AbstractFetcher::commitInternalOffsetsToKafka(). That method is actually
final, and could not be mocked.

This commit fixes that by implementing a proper mock AbstractFetcher,
which keeps track of the offset commits that go through.

commit d08727c4f8816e408dabc12a673ad24593b27023
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-20T19:54:40Z

[hotfix] [kafka] Remove stale comment on publishing procedures of 
AbstractFetcher

The previous comment mentioned "only now will the fetcher return at
least the restored offsets when calling snapshotCurrentState()". This is
a remnant of the previous fetcher initialization behaviour, where in the
past the fetcher wasn't directly seeded with restored offsets on
instantiation.

Since this is no longer true, this commit fixes the stale comment to
avoid confusion.




---


[jira] [Created] (FLINK-8419) Kafka consumer's offset metrics are not registered for dynamically discovered partitions

2018-01-11 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-8419:
--

 Summary: Kafka consumer's offset metrics are not registered for 
dynamically discovered partitions
 Key: FLINK-8419
 URL: https://issues.apache.org/jira/browse/FLINK-8419
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Metrics
Affects Versions: 1.4.0, 1.5.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
Priority: Blocker
 Fix For: 1.5.0, 1.4.1


Currently, the per-partition offset metrics are registered via the 
{{AbstractFetcher#addOffsetStateGauge}} method. That method is only ever called 
for the initial startup partitions, and not for dynamically discovered 
partitions.

We should consider adding some unit tests to make sure that metrics are 
properly registered for all partitions. That would also safeguard us from 
accidentally removing metrics.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8092) Makes no difference if python script is found or not

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8092.
---
   Resolution: Duplicate
Fix Version/s: 1.4.0

> Makes no difference if python script is found or not
> 
>
> Key: FLINK-8092
> URL: https://issues.apache.org/jira/browse/FLINK-8092
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Affects Versions: 1.3.2
> Environment: The error message is the SAME whether the right path or 
> not for a python job, as below:
> [root@master bin]# ./pyflink.sh /home/al/flink/examples/python/WordCount.py
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/https://www.linkedin.com/redir/invalid-link-page?url=127%2e0%2e0%2e1%3A6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of program
> Usage: ./bin/pyflink<2/3>.[sh/bat] [ [ 
>  The program didn't contain a Flink job. Perhaps you forgot to call execute() 
> on the execution environment.
> [root@master bin]# ./pyflink.sh /home/al/flink/examples/python/WordCount.pys
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/https://www.linkedin.com/redir/invalid-link-page?url=127%2e0%2e0%2e1%3A6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of program
> Usage: ./bin/pyflink<2/3>.[sh/bat] [ [ 
>  The program didn't contain a Flink job. Perhaps you forgot to call execute() 
> on the execution environment.
> The first link is correct and was tested. The second is a fake. Still, the 
> error is the SAME. Bug?
>Reporter: Al Costa
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-8092) Makes no difference if python script is found or not

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-8092:
-

> Makes no difference if python script is found or not
> 
>
> Key: FLINK-8092
> URL: https://issues.apache.org/jira/browse/FLINK-8092
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Affects Versions: 1.3.2
> Environment: The error message is the SAME whether the right path or 
> not for a python job, as below:
> [root@master bin]# ./pyflink.sh /home/al/flink/examples/python/WordCount.py
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/https://www.linkedin.com/redir/invalid-link-page?url=127%2e0%2e0%2e1%3A6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of program
> Usage: ./bin/pyflink<2/3>.[sh/bat] [ [ 
>  The program didn't contain a Flink job. Perhaps you forgot to call execute() 
> on the execution environment.
> [root@master bin]# ./pyflink.sh /home/al/flink/examples/python/WordCount.pys
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/https://www.linkedin.com/redir/invalid-link-page?url=127%2e0%2e0%2e1%3A6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of program
> Usage: ./bin/pyflink<2/3>.[sh/bat] [ [ 
>  The program didn't contain a Flink job. Perhaps you forgot to call execute() 
> on the execution environment.
> The first link is correct and was tested. The second is a fake. Still, the 
> error is the SAME. Bug?
>Reporter: Al Costa
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8092) Makes no difference if python script is found or not

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8092.
---
Resolution: Fixed

> Makes no difference if python script is found or not
> 
>
> Key: FLINK-8092
> URL: https://issues.apache.org/jira/browse/FLINK-8092
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Affects Versions: 1.3.2
> Environment: The error message is the SAME whether the right path or 
> not for a python job, as below:
> [root@master bin]# ./pyflink.sh /home/al/flink/examples/python/WordCount.py
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/https://www.linkedin.com/redir/invalid-link-page?url=127%2e0%2e0%2e1%3A6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of program
> Usage: ./bin/pyflink<2/3>.[sh/bat] [ [ 
>  The program didn't contain a Flink job. Perhaps you forgot to call execute() 
> on the execution environment.
> [root@master bin]# ./pyflink.sh /home/al/flink/examples/python/WordCount.pys
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/https://www.linkedin.com/redir/invalid-link-page?url=127%2e0%2e0%2e1%3A6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of program
> Usage: ./bin/pyflink<2/3>.[sh/bat] [ [ 
>  The program didn't contain a Flink job. Perhaps you forgot to call execute() 
> on the execution environment.
> The first link is correct and was tested. The second is a fake. Still, the 
> error is the SAME. Bug?
>Reporter: Al Costa
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8102) Formatting issues in Mesos documentation.

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8102.
---
   Resolution: Fixed
Fix Version/s: 1.4.0

1.4: 28b3115d23ab6844583578f7f2c7c37316c199be
master: cb73078e63e74ca19b37b07b54206c0985cec924

> Formatting issues in Mesos documentation. 
> --
>
> Key: FLINK-8102
> URL: https://issues.apache.org/jira/browse/FLINK-8102
> Project: Flink
>  Issue Type: Bug
>Reporter: Jörg Schad
>Assignee: Jörg Schad
>Priority: Minor
> Fix For: 1.4.0
>
>
> The Flink documentation renders incorrectly as some characters are not 
> probably escaped.
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#mesos



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7787) Remove guava dependency in the cassandra connector

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7787.
---
Resolution: Won't Fix

> Remove guava dependency in the cassandra connector
> --
>
> Key: FLINK-7787
> URL: https://issues.apache.org/jira/browse/FLINK-7787
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> As discovered in FLINK-6225, the cassandra connector uses the future classes 
> in the guava library. We can get rid of the dependency by using the 
> equivalent classes provided by Java 8.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7927) Different Netty Versions in dependencies of flink-runtime make it impossible to use 3rd party libraries using netty

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7927.
---
   Resolution: Fixed
Fix Version/s: 1.4.0

All netty dependencies have been relocated and will no longer cause conflicts.

> Different Netty Versions in dependencies of flink-runtime make it impossible 
> to use 3rd party libraries using netty
> ---
>
> Key: FLINK-7927
> URL: https://issues.apache.org/jira/browse/FLINK-7927
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.2
> Environment: * Windows 10 x64
> * Java 1.8
>Reporter: Claudius Eisele
> Fix For: 1.4.0
>
>
> I am trying to use Google PubSub (google-cloud-pubsub 0.26.0-beta) in a Flink 
> streaming job but I am receiving the following error when executing it so 
> unfortunately it's not possible to use PubSub in a Flink Streaming Job:
> {code:java}
> ...
> 10/25/2017 22:38:02 Source: Custom Source -> Map(1/1) switched to RUNNING
> 10/25/2017 22:38:03 Source: Custom Source -> Map(1/1) switched to FAILED
> java.lang.IllegalStateException: Expected the service InnerService [FAILED] 
> to be RUNNING, but the service has FAILED
> at 
> com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:328)
> at 
> com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:266)
> at 
> com.google.api.core.AbstractApiService.awaitRunning(AbstractApiService.java:97)
> 
> Caused by: java.lang.IllegalArgumentException: Jetty ALPN/NPN has not been 
> properly configured.
> at 
> io.grpc.netty.GrpcSslContexts.selectApplicationProtocolConfig(GrpcSslContexts.java:159)
> at io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:136)
> at io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:124)
> at io.grpc.netty.GrpcSslContexts.forClient(GrpcSslContexts.java:94)
> at 
> io.grpc.netty.NettyChannelBuilder$NettyTransportFactory$DefaultNettyTransportCreationParamsFilterFactory.(NettyChannelBuilder.java:525)
> at 
> io.grpc.netty.NettyChannelBuilder$NettyTransportFactory$DefaultNettyTransportCreationParamsFilterFactory.(NettyChannelBuilder.java:518)
> at 
> io.grpc.netty.NettyChannelBuilder$NettyTransportFactory.(NettyChannelBuilder.java:457)
> at 
> io.grpc.netty.NettyChannelBuilder.buildTransportFactory(NettyChannelBuilder.java:326)
> at 
> io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:315)
> at 
> com.google.api.gax.grpc.InstantiatingChannelProvider.createChannel(InstantiatingChannelProvider.java:131)
> at 
> com.google.api.gax.grpc.InstantiatingChannelProvider.getChannel(InstantiatingChannelProvider.java:116)
> at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:246)
> at 
> com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:149)
> at 
> com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:211)
> at 
> com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:121)
> at 
> com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:235)
> ... 7 more
> {code}
> I reported this problem to the Google Cloud Java Library but the problem 
> seems more to be in Flink or its dependencies like akka because there are a 
> lot of netty dependencies with different versions in it:
> * Apache Zookeeper (flink-runtime dependency) has \--- 
> io.netty:netty:3.7.0.Final -> 3.8.0.Final
> * Flakka (flink-runtime dependency) has io.netty:netty:3.8.0.Final
> * Flink-Runtime has io.netty:netty-all:4.0.27.Final
> In my case, Google Cloud PubSub has io.grpc:grpc-netty:1.6.1
> Additional information on the issue in combination with Google Cloud PubSub 
> can be found here:
> https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2398
> https://github.com/grpc/grpc-java/issues/3025



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7746) move ResultPartitionWriter#writeBufferToAllChannels up into ResultPartition

2018-01-11 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323354#comment-16323354
 ] 

Chesnay Schepler edited comment on FLINK-7746 at 1/12/18 1:18 AM:
--

[~NicoK] can this be closed (along with the parent issue)?


was (Author: zentol):
[~NicoK] can this be closed?

> move ResultPartitionWriter#writeBufferToAllChannels up into ResultPartition
> ---
>
> Key: FLINK-7746
> URL: https://issues.apache.org/jira/browse/FLINK-7746
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> As a step towards removing the (unneeded) {{ResultPartitionWriter}} wrapper, 
> we should move {{ResultPartitionWriter#writeBufferToAllChannels}} into 
> {{ResultPartition}} where single-channel writing happens anyway.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7746) move ResultPartitionWriter#writeBufferToAllChannels up into ResultPartition

2018-01-11 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323354#comment-16323354
 ] 

Chesnay Schepler commented on FLINK-7746:
-

[~NicoK] can this be closed?

> move ResultPartitionWriter#writeBufferToAllChannels up into ResultPartition
> ---
>
> Key: FLINK-7746
> URL: https://issues.apache.org/jira/browse/FLINK-7746
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> As a step towards removing the (unneeded) {{ResultPartitionWriter}} wrapper, 
> we should move {{ResultPartitionWriter#writeBufferToAllChannels}} into 
> {{ResultPartition}} where single-channel writing happens anyway.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7364) Log exceptions from user code in streaming jobs

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7364.
---
Resolution: Cannot Reproduce

> Log exceptions from user code in streaming jobs
> ---
>
> Key: FLINK-7364
> URL: https://issues.apache.org/jira/browse/FLINK-7364
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.1
>Reporter: Elias Levy
>
> Currently, if an exception arises in user supplied code within an operator in 
> a streaming job, Flink terminates the job, but it fails to record the reason 
> for the termination.  The logs do not record that there was an exception at 
> all, much less recording the type of exception and where it occurred.  This 
> makes it difficult to debug jobs without implementing exception recording 
> code on all user supplied operators. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6375) Fix LongValue hashCode

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6375:

Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-3957

> Fix LongValue hashCode
> --
>
> Key: FLINK-6375
> URL: https://issues.apache.org/jira/browse/FLINK-6375
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Affects Versions: 2.0.0
>Reporter: Greg Hogan
>Assignee: Andrew Psaltis
>Priority: Trivial
>
> Match {{LongValue.hashCode}} to {{Long.hashCode}} (and the other numeric 
> types) by simply adding the high and low words rather than shifting the hash 
> by adding 43.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5280: Fix typo in AbstractMetricGroup.java

2018-01-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5280
  
Thanks, merging this.

Do note that you should include the `[hotfix]` and `[doc]` tags to the 
commit message. All our commit requires proper tags to indicate the issue id 
(or `hotfix` for typos), and ideally also the fixed component.

I'll do it for you while merging this time ;)


---


[jira] [Closed] (FLINK-6234) HTTP/HTTPS WebServer Extension

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-6234.
---
   Resolution: Invalid
Fix Version/s: (was: 2.0.0)

Not enough information was provided.

> HTTP/HTTPS WebServer Extension
> --
>
> Key: FLINK-6234
> URL: https://issues.apache.org/jira/browse/FLINK-6234
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 2.0.0
> Environment: All
>Reporter: Abey Sam Alex
>  Labels: WebFramework
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> All of the existing NIO based Java webserver have their processing locked on 
> to a single machine and Web Frameworks are locked down to single instances. 
> Although by nature Every web request is like an endless stream of events 
> which have multiple pipe within it. In nature the very method of its 
> functioning is very similar to have Flink works.
> Most of the Web Server scaling is limited to adding more servers but rarely 
> utilizing all of the phsyical cores or pipelining the intermediate steps.  
> There are other advantages which the flink can bring about by having an 
> extensions for Web Servers. 
> The intent is to provide an extension with which once can write an Web 
> Application using the Flink Framework and take advantage of the streaming 
> architecture and have easy scalability by adding new nodes at run time.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-6206:

Summary: Log task state transitions as warn/error for FAILURE scenarios  
(was: As an Engineer, I want task state transition log to be warn/error for 
FAILURE scenarios)

> Log task state transitions as warn/error for FAILURE scenarios
> --
>
> Key: FLINK-6206
> URL: https://issues.apache.org/jira/browse/FLINK-6206
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Dan Bress
>Priority: Critical
>
> If a task fails due to an exception, I would like that to be logged at a warn 
> or an error level.  currently its info
> {code}
> private boolean transitionState(ExecutionState currentState, ExecutionState 
> newState, Throwable cause) {
>   if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
>   if (cause == null) {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState);
>   } else {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState, cause);
>   }
>   return true;
>   } else {
>   return false;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5283: [hotfix][doc] Fixed doc typo in DataStream API

2018-01-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5283
  
Merging ..


---


[jira] [Closed] (FLINK-5848) make Flink Web Backend a little bit more restful

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-5848.
---
   Resolution: Later
Fix Version/s: 1.5.0

Subsumed by the REST backend rework.

> make Flink Web Backend a little bit more restful
> 
>
> Key: FLINK-5848
> URL: https://issues.apache.org/jira/browse/FLINK-5848
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Fabian Wollert
>Assignee: Fabian Wollert
> Fix For: 1.5.0
>
>
> we are using the web backend for managing flink jobs (cancelling, starting, 
> etc.). Unfortunately the Backend is not completely RESTful, the responses are 
> mixed.
> E.g. 
> https://github.com/apache/flink/blob/master/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
>  is showing that if a error occurs in the backend, its not resulting in a 
> HTTP error code and the response is not JSON.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5746) Add acceptEither and applyToEither to Flink's Future

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-5746.
---
Resolution: Won't Fix

Our Future class has been replaced with Java8 {{CompletableFuture}}s.

> Add acceptEither and applyToEither to Flink's Future
> 
>
> Key: FLINK-5746
> URL: https://issues.apache.org/jira/browse/FLINK-5746
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Priority: Minor
>
> Flink's futures are missing the method {{acceptEither}} and {{applyToEither}} 
> in order to react to the completion of one of two futures. Adding them would 
> be helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5691) Creating Reporter for elasticsearch 5.1.X causing conflicts of io.netty library

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-5691.
---
   Resolution: Fixed
Fix Version/s: 1.4.0

We have relocated our netty dependency to prevent conflicts.

> Creating Reporter for elasticsearch 5.1.X causing conflicts of io.netty 
> library
> ---
>
> Key: FLINK-5691
> URL: https://issues.apache.org/jira/browse/FLINK-5691
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.3.0
> Environment: linux 
>Reporter: prabhat kumar
>  Labels: features, maven
> Fix For: 1.4.0
>
>
> Trying to write reporter for elasticsearch 5.1.x using TransportClient which 
> internally using io.netty version 4.1.6 which has a call to a method which is 
> not present in flink io.netty verion 4.0.27 causing error
> ***
> Exception in thread "elasticsearch[_client_][management][T#1]" 
> java.lang.NoSuchMethodError: 
> io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
> at 
> org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
> at 
> org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:449)
> at 
> org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:91)
> at 
> org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:976)
> at 
> org.elasticsearch.transport.TcpTransport.sendRequest(TcpTransport.java:958)
> at 
> org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:520)
> at 
> org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:465)
> at 
> org.elasticsearch.client.transport.TransportClientNodesService$SniffNodesSampler$1.run(TransportClientNodesService.java:482)
> at 
> org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:458)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> ***
> trying to upgrade the jar but in pom.xml of flink it's mentioned not to 
> upgrade else there could be memory issue 
> 
>   io.netty
>   netty-all
>   
>   4.0.27.Final
>   
> Please suggest a workaround.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5453) REST API request reference incomplete

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-5453.
---
Resolution: Later

Will be fixed in 1.5 as part of FLINK-8133.

> REST API request reference incomplete
> -
>
> Key: FLINK-5453
> URL: https://issues.apache.org/jira/browse/FLINK-5453
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>
> The REST API documentation is supposed to contain a list of all available 
> requests, is however missing all requests related to
> * jobmanager
> * taskmanagers
> * checkpoints
> * metrics



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5403) Apache Flink Snapshot 1.2 Javadocs link does not have files like index.html, overview-summary.html or index-all.html

2018-01-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-5403.
---
Resolution: Cannot Reproduce

> Apache Flink Snapshot 1.2 Javadocs link does not have files like index.html, 
> overview-summary.html or index-all.html 
> -
>
> Key: FLINK-5403
> URL: https://issues.apache.org/jira/browse/FLINK-5403
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.4
>Reporter: Markus Dale
>Priority: Minor
>
> The link http://flink.apache.org - Documentation - Snapshot (Development) - 
> 1.2 Javadocs 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.2/api/java/) 
> points to a directory that does not seem to contain an index.html file so the 
> Javadoc Frame does not get displayed. 
> The Javadoc html files for individual classes exist (e.g. 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/api/java/org/apache/flink/client/cli/CancelOptions.html)
>  but when clicking from there to Overview or Frames 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.2/api/java/index.html?org/apache/flink/client/cli/CancelOptions.html)
>  those resources are not found.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8335) Upgrade hbase connector dependency to 1.4.0

2018-01-11 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-8335:
--
Component/s: Batch Connectors and Input/Output Formats

> Upgrade hbase connector dependency to 1.4.0
> ---
>
> Key: FLINK-8335
> URL: https://issues.apache.org/jira/browse/FLINK-8335
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Ted Yu
>Priority: Minor
>
> hbase 1.4.0 has been released.
> 1.4.0 shows speed improvement over previous 1.x releases.
> http://search-hadoop.com/m/HBase/YGbbBxedD1Mnm8t?subj=Re+VOTE+The+second+HBase+1+4+0+release+candidate+RC1+is+available
> This issue is to upgrade the dependency to 1.4.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8418) Kafka08ITCase.testStartFromLatest() times out on Travis

2018-01-11 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-8418:
--

 Summary: Kafka08ITCase.testStartFromLatest() times out on Travis
 Key: FLINK-8418
 URL: https://issues.apache.org/jira/browse/FLINK-8418
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Tests
Affects Versions: 1.4.0, 1.5.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
Priority: Critical
 Fix For: 1.5.0, 1.4.1


Instance: https://travis-ci.org/kl0u/flink/builds/327733085



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8418) Kafka08ITCase.testStartFromLatestOffsets() times out on Travis

2018-01-11 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8418:
---
Summary: Kafka08ITCase.testStartFromLatestOffsets() times out on Travis  
(was: Kafka08ITCase.testStartFromLatest() times out on Travis)

> Kafka08ITCase.testStartFromLatestOffsets() times out on Travis
> --
>
> Key: FLINK-8418
> URL: https://issues.apache.org/jira/browse/FLINK-8418
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.1
>
>
> Instance: https://travis-ci.org/kl0u/flink/builds/327733085



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8073) Test instability FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()

2018-01-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323281#comment-16323281
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8073:


Another instance: https://api.travis-ci.org/v3/job/327769316/log.txt

> Test instability 
> FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()
> -
>
> Key: FLINK-8073
> URL: https://issues.apache.org/jira/browse/FLINK-8073
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/301985988



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8417) Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer

2018-01-11 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8417:
---
Description: 
As discussed in ML: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connectors-With-Temporary-Credentials-td17734.html.

Users need the functionality to access cross-account AWS Kinesis streams, using 
AWS Temporary Credentials [1].

We should add support for 
{{AWSConfigConstants.CredentialsProvider.STSAssumeRole}}, which internally 
would use the {{STSAssumeRoleSessionCredentialsProvider}} [2] in 
{{AWSUtil#getCredentialsProvider(Properties)}}.

[1] https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html
[2] 
https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/STSAssumeRoleSessionCredentialsProvider.html

  was:
As discussed in ML: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connectors-With-Temporary-Credentials-td17734.html.

Users need the functionality to access cross-account AWS Kinesis streams, using 
AWS Temporary Credentials [1].

We should add support for 
{{AWSConfigConstants.CredentialsProvider.STSAssumeRole}}, which internally 
would use the {{STSAssumeRoleSessionCredentialsProvider}} in 
{{AWSUtil#getCredentialsProvider(Properties)}}.

[1] https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html


> Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer
> ---
>
> Key: FLINK-8417
> URL: https://issues.apache.org/jira/browse/FLINK-8417
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.5.0
>
>
> As discussed in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connectors-With-Temporary-Credentials-td17734.html.
> Users need the functionality to access cross-account AWS Kinesis streams, 
> using AWS Temporary Credentials [1].
> We should add support for 
> {{AWSConfigConstants.CredentialsProvider.STSAssumeRole}}, which internally 
> would use the {{STSAssumeRoleSessionCredentialsProvider}} [2] in 
> {{AWSUtil#getCredentialsProvider(Properties)}}.
> [1] https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html
> [2] 
> https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/STSAssumeRoleSessionCredentialsProvider.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8417) Support STSAssumeRoleSessionCredentialsProvider in FlinkKinesisConsumer

2018-01-11 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-8417:
--

 Summary: Support STSAssumeRoleSessionCredentialsProvider in 
FlinkKinesisConsumer
 Key: FLINK-8417
 URL: https://issues.apache.org/jira/browse/FLINK-8417
 Project: Flink
  Issue Type: New Feature
  Components: Kinesis Connector
Reporter: Tzu-Li (Gordon) Tai
 Fix For: 1.5.0


As discussed in ML: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connectors-With-Temporary-Credentials-td17734.html.

Users need the functionality to access cross-account AWS Kinesis streams, using 
AWS Temporary Credentials [1].

We should add support for 
{{AWSConfigConstants.CredentialsProvider.STSAssumeRole}}, which internally 
would use the {{STSAssumeRoleSessionCredentialsProvider}} in 
{{AWSUtil#getCredentialsProvider(Properties)}}.

[1] https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8416) Kinesis consumer doc examples should demonstrate preferred default credentials provider

2018-01-11 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-8416:
--

 Summary: Kinesis consumer doc examples should demonstrate 
preferred default credentials provider
 Key: FLINK-8416
 URL: https://issues.apache.org/jira/browse/FLINK-8416
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Kinesis Connector
Reporter: Tzu-Li (Gordon) Tai
 Fix For: 1.3.3, 1.5.0, 1.4.1


The Kinesis consumer docs 
[here](https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kinesis.html#kinesis-consumer)
 demonstrate providing credentials by explicitly supplying the AWS Access ID 
and Key.

The always preferred approach for AWS, unless running locally, is to 
automatically fetch the shipped credentials from the AWS environment.

That is actually the default behaviour of the Kinesis consumer, so the docs 
should demonstrate that more clearly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8296) Rework FlinkKafkaConsumerBestTest to not use Java reflection for dependency injection

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323208#comment-16323208
 ] 

ASF GitHub Bot commented on FLINK-8296:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5188
  
Thanks! Merging this ...


> Rework FlinkKafkaConsumerBestTest to not use Java reflection for dependency 
> injection
> -
>
> Key: FLINK-8296
> URL: https://issues.apache.org/jira/browse/FLINK-8296
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.5.0, 1.4.1
>
>
> The current {{FlinkKafkaConsumerBaseTest}} is heavily relying on Java 
> reflection for dependency injection. Using reflection to compose unit tests 
> really should be a last resort, and indicates that the tests there are highly 
> implementation-specific, and that we should make the design more testable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5188: [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to...

2018-01-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5188
  
Thanks! Merging this ...


---


[jira] [Closed] (FLINK-8333) Split command options from deployment options in CliFrontend

2018-01-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8333.

Resolution: Fixed

Fixed via 12396f19851e74310c9b5f28870a8de9794511fc

> Split command options from deployment options in CliFrontend
> 
>
> Key: FLINK-8333
> URL: https://issues.apache.org/jira/browse/FLINK-8333
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to better support different {{CustomCommandLines}} we should split 
> the command and deployment option parsing in the {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8299) Retrieve ExecutionResult by REST polling

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323194#comment-16323194
 ] 

ASF GitHub Bot commented on FLINK-8299:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5207


> Retrieve ExecutionResult by REST polling
> 
>
> Key: FLINK-8299
> URL: https://issues.apache.org/jira/browse/FLINK-8299
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Retrieve the {{ExecutionResult}} from a finished Flink job via the 
> {{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8333) Split command options from deployment options in CliFrontend

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323192#comment-16323192
 ] 

ASF GitHub Bot commented on FLINK-8333:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5220


> Split command options from deployment options in CliFrontend
> 
>
> Key: FLINK-8333
> URL: https://issues.apache.org/jira/browse/FLINK-8333
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to better support different {{CustomCommandLines}} we should split 
> the command and deployment option parsing in the {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5225: [FLINK-8339] [flip6] Let CustomCommandLine return ...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5225


---


[jira] [Commented] (FLINK-8338) Make CustomCommandLines non static in CliFrontend

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323193#comment-16323193
 ] 

ASF GitHub Bot commented on FLINK-8338:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5224


> Make CustomCommandLines non static in CliFrontend
> -
>
> Key: FLINK-8338
> URL: https://issues.apache.org/jira/browse/FLINK-8338
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For better testability and maintainability we should make the 
> {{CustomCommandLine}} registration non-static in {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5207


---


[jira] [Closed] (FLINK-8338) Make CustomCommandLines non static in CliFrontend

2018-01-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8338.

Resolution: Fixed

Fixed via aff43768f3285a5f2bc5593369a7fec3ed77a2af

> Make CustomCommandLines non static in CliFrontend
> -
>
> Key: FLINK-8338
> URL: https://issues.apache.org/jira/browse/FLINK-8338
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For better testability and maintainability we should make the 
> {{CustomCommandLine}} registration non-static in {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323191#comment-16323191
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5225


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5224: [FLINK-8338] [flip6] Make CustomCommandLines non s...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5224


---


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5220


---


[jira] [Closed] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8339.

Resolution: Fixed

Fixed via e2f1ba92decdb27f3aea4e21a7cad7dcc98cea1a

> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-8299) Retrieve ExecutionResult by REST polling

2018-01-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8299.
--
Resolution: Fixed

Fixed via 06922753a55dc322b96919ebb407d531e2b79d3e

> Retrieve ExecutionResult by REST polling
> 
>
> Key: FLINK-8299
> URL: https://issues.apache.org/jira/browse/FLINK-8299
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Retrieve the {{ExecutionResult}} from a finished Flink job via the 
> {{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8347) Make Cluster id typesafe

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323107#comment-16323107
 ] 

ASF GitHub Bot commented on FLINK-8347:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5232#discussion_r161094149
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -367,40 +366,31 @@ public ClusterClient retrieve(String applicationID) {
flinkConfiguration,
false);
} catch (Exception e) {
-   throw new RuntimeException("Couldn't retrieve Yarn 
cluster", e);
+   throw new ClusterRetrieveException("Couldn't retrieve 
Yarn cluster", e);
}
}
 
@Override
-   public YarnClusterClient deploySessionCluster(ClusterSpecification 
clusterSpecification) {
+   public YarnClusterClient deploySessionCluster(ClusterSpecification 
clusterSpecification) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
getYarnSessionClusterEntrypoint(),
null);
} catch (Exception e) {
-   throw new RuntimeException("Couldn't deploy Yarn 
session cluster", e);
+   throw new ClusterDeploymentException("Couldn't deploy 
Yarn session cluster", e);
}
}
 
@Override
-   public YarnClusterClient deployJobCluster(ClusterSpecification 
clusterSpecification, JobGraph jobGraph) {
+   public YarnClusterClient deployJobCluster(ClusterSpecification 
clusterSpecification, JobGraph jobGraph) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
getYarnJobClusterEntrypoint(),
jobGraph);
} catch (Exception e) {
-   throw new RuntimeException("Could not deploy Yarn job 
cluster.", e);
-   }
-   }
-
-   @Override
--- End diff --

Good catch. Will add it.


> Make Cluster id typesafe
> 
>
> Key: FLINK-8347
> URL: https://issues.apache.org/jira/browse/FLINK-8347
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the cluster id is of type {{String}}. We should make the id 
> typesafe to avoid mixups between different {{CustomCommandLines}} and 
> {{ClusterDescriptors}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5232: [FLINK-8347] [flip6] Make cluster id used by Clust...

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5232#discussion_r161094149
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -367,40 +366,31 @@ public ClusterClient retrieve(String applicationID) {
flinkConfiguration,
false);
} catch (Exception e) {
-   throw new RuntimeException("Couldn't retrieve Yarn 
cluster", e);
+   throw new ClusterRetrieveException("Couldn't retrieve 
Yarn cluster", e);
}
}
 
@Override
-   public YarnClusterClient deploySessionCluster(ClusterSpecification 
clusterSpecification) {
+   public YarnClusterClient deploySessionCluster(ClusterSpecification 
clusterSpecification) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
getYarnSessionClusterEntrypoint(),
null);
} catch (Exception e) {
-   throw new RuntimeException("Couldn't deploy Yarn 
session cluster", e);
+   throw new ClusterDeploymentException("Couldn't deploy 
Yarn session cluster", e);
}
}
 
@Override
-   public YarnClusterClient deployJobCluster(ClusterSpecification 
clusterSpecification, JobGraph jobGraph) {
+   public YarnClusterClient deployJobCluster(ClusterSpecification 
clusterSpecification, JobGraph jobGraph) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
getYarnJobClusterEntrypoint(),
jobGraph);
} catch (Exception e) {
-   throw new RuntimeException("Could not deploy Yarn job 
cluster.", e);
-   }
-   }
-
-   @Override
--- End diff --

Good catch. Will add it.


---


[jira] [Commented] (FLINK-8343) Add support for job cluster deployment

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323104#comment-16323104
 ] 

ASF GitHub Bot commented on FLINK-8343:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5229
  
Thanks for the review @GJL. Merged onto the latest master.


> Add support for job cluster deployment
> --
>
> Key: FLINK-8343
> URL: https://issues.apache.org/jira/browse/FLINK-8343
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For Flip-6 we have to enable a different job cluster deployment. The 
> difference is that we directly submit the job when we deploy the Flink 
> cluster instead of following a two step approach (first deployment and then 
> submission).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5229: [FLINK-8343] [flip6] Remove Yarn specific commands from Y...

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5229
  
Thanks for the review @GJL. Merged onto the latest master.


---


[GitHub] flink pull request #5229: [FLINK-8343] [flip6] Remove Yarn specific commands...

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5229#discussion_r161089633
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -534,215 +557,235 @@ private Configuration 
applyYarnProperties(Configuration configuration) throws Fl
return effectiveConfiguration;
}
 
-   public int run(
-   String[] args,
-   Configuration configuration,
-   String configurationDirectory) {
+   public int run(String[] args) throws CliArgsException, FlinkException {
//
//  Command Line Options
//
-   Options options = new Options();
-   addGeneralOptions(options);
-   addRunOptions(options);
+   final CommandLine cmd = parseCommandLineOptions(args, true);
 
-   CommandLineParser parser = new PosixParser();
-   CommandLine cmd;
-   try {
-   cmd = parser.parse(options, args);
-   } catch (Exception e) {
-   System.out.println(e.getMessage());
-   printUsage();
-   return 1;
-   }
+   final AbstractYarnClusterDescriptor yarnClusterDescriptor = 
createClusterDescriptor(cmd);
 
-   // Query cluster for metrics
-   if (cmd.hasOption(query.getOpt())) {
-   AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
-   configuration,
-   configurationDirectory,
-   cmd.hasOption(flip6.getOpt()));
-   String description;
-   try {
-   description = 
yarnDescriptor.getClusterDescription();
-   } catch (Exception e) {
-   System.err.println("Error while querying the 
YARN cluster for available resources: " + e.getMessage());
-   e.printStackTrace(System.err);
-   return 1;
-   }
-   System.out.println(description);
-   return 0;
-   } else if (cmd.hasOption(applicationId.getOpt())) {
+   try {
+   // Query cluster for metrics
+   if (cmd.hasOption(query.getOpt())) {
+   final String description = 
yarnClusterDescriptor.getClusterDescription();
+   System.out.println(description);
+   return 0;
+   } else {
+   final ClusterClient clusterClient;
+   final ApplicationId yarnApplicationId;
 
-   AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
-   configuration,
-   configurationDirectory,
-   cmd.hasOption(flip6.getOpt()));
+   if (cmd.hasOption(applicationId.getOpt())) {
+   yarnApplicationId = 
ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
 
-   //configure ZK namespace depending on the value passed
-   String zkNamespace = 
cmd.hasOption(zookeeperNamespace.getOpt()) ?
-   
cmd.getOptionValue(zookeeperNamespace.getOpt())
-   : 
yarnDescriptor.getFlinkConfiguration()
-   
.getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
-   LOG.info("Going to use the ZK namespace: {}", 
zkNamespace);
-   
yarnDescriptor.getFlinkConfiguration().setString(HA_CLUSTER_ID, zkNamespace);
+   clusterClient = 
yarnClusterDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
+   } else {
+   final ClusterSpecification 
clusterSpecification = getClusterSpecification(cmd);
 
-   try {
-   yarnCluster = 
yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
-   } catch (Exception e) {
-   throw new RuntimeException("Could not retrieve 
existing Yarn application", e);
-   }
+   clusterClient = 
yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
 
-   if (detachedMode) {
-   

[jira] [Commented] (FLINK-8343) Add support for job cluster deployment

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323060#comment-16323060
 ] 

ASF GitHub Bot commented on FLINK-8343:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5229#discussion_r161090177
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -534,215 +557,235 @@ private Configuration 
applyYarnProperties(Configuration configuration) throws Fl
return effectiveConfiguration;
}
 
-   public int run(
-   String[] args,
-   Configuration configuration,
-   String configurationDirectory) {
+   public int run(String[] args) throws CliArgsException, FlinkException {
//
//  Command Line Options
//
-   Options options = new Options();
-   addGeneralOptions(options);
-   addRunOptions(options);
+   final CommandLine cmd = parseCommandLineOptions(args, true);
 
-   CommandLineParser parser = new PosixParser();
-   CommandLine cmd;
-   try {
-   cmd = parser.parse(options, args);
-   } catch (Exception e) {
-   System.out.println(e.getMessage());
-   printUsage();
-   return 1;
-   }
+   final AbstractYarnClusterDescriptor yarnClusterDescriptor = 
createClusterDescriptor(cmd);
 
-   // Query cluster for metrics
-   if (cmd.hasOption(query.getOpt())) {
-   AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
-   configuration,
-   configurationDirectory,
-   cmd.hasOption(flip6.getOpt()));
-   String description;
-   try {
-   description = 
yarnDescriptor.getClusterDescription();
-   } catch (Exception e) {
-   System.err.println("Error while querying the 
YARN cluster for available resources: " + e.getMessage());
-   e.printStackTrace(System.err);
-   return 1;
-   }
-   System.out.println(description);
-   return 0;
-   } else if (cmd.hasOption(applicationId.getOpt())) {
+   try {
+   // Query cluster for metrics
+   if (cmd.hasOption(query.getOpt())) {
+   final String description = 
yarnClusterDescriptor.getClusterDescription();
+   System.out.println(description);
+   return 0;
+   } else {
+   final ClusterClient clusterClient;
+   final ApplicationId yarnApplicationId;
 
-   AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
-   configuration,
-   configurationDirectory,
-   cmd.hasOption(flip6.getOpt()));
+   if (cmd.hasOption(applicationId.getOpt())) {
+   yarnApplicationId = 
ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
 
-   //configure ZK namespace depending on the value passed
-   String zkNamespace = 
cmd.hasOption(zookeeperNamespace.getOpt()) ?
-   
cmd.getOptionValue(zookeeperNamespace.getOpt())
-   : 
yarnDescriptor.getFlinkConfiguration()
-   
.getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
-   LOG.info("Going to use the ZK namespace: {}", 
zkNamespace);
--- End diff --

It should not matter.


> Add support for job cluster deployment
> --
>
> Key: FLINK-8343
> URL: https://issues.apache.org/jira/browse/FLINK-8343
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For Flip-6 we have to enable a different job cluster deployment. The 
> difference is that we directly submit the job when we deploy the Flink 
> cluster instead of following a two step approach (first deployment and 

[GitHub] flink pull request #5229: [FLINK-8343] [flip6] Remove Yarn specific commands...

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5229#discussion_r161090177
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -534,215 +557,235 @@ private Configuration 
applyYarnProperties(Configuration configuration) throws Fl
return effectiveConfiguration;
}
 
-   public int run(
-   String[] args,
-   Configuration configuration,
-   String configurationDirectory) {
+   public int run(String[] args) throws CliArgsException, FlinkException {
//
//  Command Line Options
//
-   Options options = new Options();
-   addGeneralOptions(options);
-   addRunOptions(options);
+   final CommandLine cmd = parseCommandLineOptions(args, true);
 
-   CommandLineParser parser = new PosixParser();
-   CommandLine cmd;
-   try {
-   cmd = parser.parse(options, args);
-   } catch (Exception e) {
-   System.out.println(e.getMessage());
-   printUsage();
-   return 1;
-   }
+   final AbstractYarnClusterDescriptor yarnClusterDescriptor = 
createClusterDescriptor(cmd);
 
-   // Query cluster for metrics
-   if (cmd.hasOption(query.getOpt())) {
-   AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
-   configuration,
-   configurationDirectory,
-   cmd.hasOption(flip6.getOpt()));
-   String description;
-   try {
-   description = 
yarnDescriptor.getClusterDescription();
-   } catch (Exception e) {
-   System.err.println("Error while querying the 
YARN cluster for available resources: " + e.getMessage());
-   e.printStackTrace(System.err);
-   return 1;
-   }
-   System.out.println(description);
-   return 0;
-   } else if (cmd.hasOption(applicationId.getOpt())) {
+   try {
+   // Query cluster for metrics
+   if (cmd.hasOption(query.getOpt())) {
+   final String description = 
yarnClusterDescriptor.getClusterDescription();
+   System.out.println(description);
+   return 0;
+   } else {
+   final ClusterClient clusterClient;
+   final ApplicationId yarnApplicationId;
 
-   AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
-   configuration,
-   configurationDirectory,
-   cmd.hasOption(flip6.getOpt()));
+   if (cmd.hasOption(applicationId.getOpt())) {
+   yarnApplicationId = 
ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
 
-   //configure ZK namespace depending on the value passed
-   String zkNamespace = 
cmd.hasOption(zookeeperNamespace.getOpt()) ?
-   
cmd.getOptionValue(zookeeperNamespace.getOpt())
-   : 
yarnDescriptor.getFlinkConfiguration()
-   
.getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
-   LOG.info("Going to use the ZK namespace: {}", 
zkNamespace);
--- End diff --

It should not matter.


---


[jira] [Commented] (FLINK-8343) Add support for job cluster deployment

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323054#comment-16323054
 ] 

ASF GitHub Bot commented on FLINK-8343:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5229#discussion_r161089633
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -534,215 +557,235 @@ private Configuration 
applyYarnProperties(Configuration configuration) throws Fl
return effectiveConfiguration;
}
 
-   public int run(
-   String[] args,
-   Configuration configuration,
-   String configurationDirectory) {
+   public int run(String[] args) throws CliArgsException, FlinkException {
//
//  Command Line Options
//
-   Options options = new Options();
-   addGeneralOptions(options);
-   addRunOptions(options);
+   final CommandLine cmd = parseCommandLineOptions(args, true);
 
-   CommandLineParser parser = new PosixParser();
-   CommandLine cmd;
-   try {
-   cmd = parser.parse(options, args);
-   } catch (Exception e) {
-   System.out.println(e.getMessage());
-   printUsage();
-   return 1;
-   }
+   final AbstractYarnClusterDescriptor yarnClusterDescriptor = 
createClusterDescriptor(cmd);
 
-   // Query cluster for metrics
-   if (cmd.hasOption(query.getOpt())) {
-   AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
-   configuration,
-   configurationDirectory,
-   cmd.hasOption(flip6.getOpt()));
-   String description;
-   try {
-   description = 
yarnDescriptor.getClusterDescription();
-   } catch (Exception e) {
-   System.err.println("Error while querying the 
YARN cluster for available resources: " + e.getMessage());
-   e.printStackTrace(System.err);
-   return 1;
-   }
-   System.out.println(description);
-   return 0;
-   } else if (cmd.hasOption(applicationId.getOpt())) {
+   try {
+   // Query cluster for metrics
+   if (cmd.hasOption(query.getOpt())) {
+   final String description = 
yarnClusterDescriptor.getClusterDescription();
+   System.out.println(description);
+   return 0;
+   } else {
+   final ClusterClient clusterClient;
+   final ApplicationId yarnApplicationId;
 
-   AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
-   configuration,
-   configurationDirectory,
-   cmd.hasOption(flip6.getOpt()));
+   if (cmd.hasOption(applicationId.getOpt())) {
+   yarnApplicationId = 
ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
 
-   //configure ZK namespace depending on the value passed
-   String zkNamespace = 
cmd.hasOption(zookeeperNamespace.getOpt()) ?
-   
cmd.getOptionValue(zookeeperNamespace.getOpt())
-   : 
yarnDescriptor.getFlinkConfiguration()
-   
.getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
-   LOG.info("Going to use the ZK namespace: {}", 
zkNamespace);
-   
yarnDescriptor.getFlinkConfiguration().setString(HA_CLUSTER_ID, zkNamespace);
+   clusterClient = 
yarnClusterDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
+   } else {
+   final ClusterSpecification 
clusterSpecification = getClusterSpecification(cmd);
 
-   try {
-   yarnCluster = 
yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
-   } catch (Exception e) {
-   throw new RuntimeException("Could not retrieve 

[GitHub] flink pull request #5228: [FLINK-8342] [flip6] Remove generic type parameter...

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5228#discussion_r161087011
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -99,7 +99,7 @@
 /**
  * The descriptor with deployment information for spawning or resuming a 
{@link YarnClusterClient}.
  */
-public abstract class AbstractYarnClusterDescriptor implements 
ClusterDescriptor {
+public abstract class AbstractYarnClusterDescriptor implements 
ClusterDescriptor {
private static final Logger LOG = 
LoggerFactory.getLogger(YarnClusterDescriptor.class);
--- End diff --

Not intended. Will change it.


---


[jira] [Commented] (FLINK-8342) Remove ClusterClient generic type parameter from ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323041#comment-16323041
 ] 

ASF GitHub Bot commented on FLINK-8342:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5228#discussion_r161087011
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -99,7 +99,7 @@
 /**
  * The descriptor with deployment information for spawning or resuming a 
{@link YarnClusterClient}.
  */
-public abstract class AbstractYarnClusterDescriptor implements 
ClusterDescriptor {
+public abstract class AbstractYarnClusterDescriptor implements 
ClusterDescriptor {
private static final Logger LOG = 
LoggerFactory.getLogger(YarnClusterDescriptor.class);
--- End diff --

Not intended. Will change it.


> Remove ClusterClient generic type parameter from ClusterDescriptor
> --
>
> Key: FLINK-8342
> URL: https://issues.apache.org/jira/browse/FLINK-8342
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{ClusterDescriptor}} should not specialize the returned 
> {{ClusterClient}} type in order to develop code which can work with all 
> {{ClusterDescriptors}} and {{ClusterClients}}. Therefore, I propose to remove 
> the generic type parameter from {{ClusterDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8341) Remove unneeded CommandLineOptions

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323034#comment-16323034
 ] 

ASF GitHub Bot commented on FLINK-8341:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5227
  
Thanks for the review @GJL. Addressing comments and merging once Travis 
gives green light.


> Remove unneeded CommandLineOptions
> --
>
> Key: FLINK-8341
> URL: https://issues.apache.org/jira/browse/FLINK-8341
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> With the refactorings of the {{CliFrontend}} we no longer have to keep the 
> JobManager address and commandLine in the {{CommandLineOptions}}. Therefore, 
> these fields should be removed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5227: [FLINK-8341] [flip6] Remove not needed options from Comma...

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5227
  
Thanks for the review @GJL. Addressing comments and merging once Travis 
gives green light.


---


[jira] [Commented] (FLINK-8299) Retrieve ExecutionResult by REST polling

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322854#comment-16322854
 ] 

ASF GitHub Bot commented on FLINK-8299:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5207
  
Merging this PR. Thanks for your contribution @GJL.


> Retrieve ExecutionResult by REST polling
> 
>
> Key: FLINK-8299
> URL: https://issues.apache.org/jira/browse/FLINK-8299
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Retrieve the {{ExecutionResult}} from a finished Flink job via the 
> {{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5207: [FLINK-8299][flip6] Poll JobExecutionResult after job sub...

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5207
  
Merging this PR. Thanks for your contribution @GJL.


---


[jira] [Created] (FLINK-8415) Unprotected access to recordsToSend in LongRecordWriterThread#shutdown()

2018-01-11 Thread Ted Yu (JIRA)
Ted Yu created FLINK-8415:
-

 Summary: Unprotected access to recordsToSend in 
LongRecordWriterThread#shutdown()
 Key: FLINK-8415
 URL: https://issues.apache.org/jira/browse/FLINK-8415
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
  public void shutdown() {
running = false;
recordsToSend.complete(0L);
{code}
In other methods, access to recordsToSend is protected by synchronized keyword.

shutdown() should do the same.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables

2018-01-11 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322789#comment-16322789
 ] 

Elias Levy commented on FLINK-7935:
---

So it seems the DD reporter needs to switch from 
{{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} to 
{{MetricGroup#getMetricIdentifier(String)}}.  That would be sufficient for my 
immediate use case, as I am only looking to add a single user supplied 
scope/tag.

That said, I can see {{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} 
becoming cumbersome if a user wishes to use multiple key-values/tags.  E.g. 

{code}
getRuntimeContext()
  .getMetricGroup()
  .addGroup("messages")
  .addGroup("type", messageType)
  .addGroup("source", messageSource)
  .addGroup("priority", messagePriority)
  .counter("count")
{code}

would be named {{.messages.type.source.priority.count}} 
instead of just {{.messages.count}} with variables/tags 
{{type}}, {{source}}, and {{priority}}.

> Metrics with user supplied scope variables
> --
>
> Key: FLINK-7935
> URL: https://issues.apache.org/jira/browse/FLINK-7935
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>
> We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
> metrics.
> Flink names and scopes metrics together, at least by default. E.g. by default 
>  the System scope for operator metrics is 
> {{.taskmanager}}.  
> The scope variables become part of the metric's full name.
> In DD the metric would be named something generic, e.g. 
> {{taskmanager.job.operator}}, and they would be distinguished by their tag 
> values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.
> Flink allows you to configure the format string for system scopes, so it is 
> possible to set the operator scope format to {{taskmanager.job.operator}}.  
> We do this for all scopes:
> {code}
> metrics.scope.jm: jobmanager
> metrics.scope.jm.job: jobmanager.job
> metrics.scope.tm: taskmanager
> metrics.scope.tm.job: taskmanager.job
> metrics.scope.task: taskmanager.job.task
> metrics.scope.operator: taskmanager.job.operator
> {code}
> This seems to work.  The DataDog Flink metric's plugin submits all scope 
> variables as tags, even if they are not used within the scope format.  And it 
> appears internally this does not lead to metrics conflicting with each other.
> We would like to extend this to user defined metrics, but you can define 
> variables/scopes when adding a metric group or metric with the user API, so 
> that in DD we have a single metric with a tag with many different values, 
> rather than hundreds of metrics to just the one value we want to measure 
> across different event types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8299) Retrieve ExecutionResult by REST polling

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322541#comment-16322541
 ] 

ASF GitHub Bot commented on FLINK-8299:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r161010273
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws 
JobSubmissionException {
}
}
 
+   private JobExecutionResult waitForJobExecutionResult(
+   final JobID jobId) throws ProgramInvocationException {
+
+   final JobMessageParameters messageParameters = new 
JobMessageParameters();
+   messageParameters.jobPathParameter.resolve(jobId);
+   JobExecutionResultResponseBody jobExecutionResultResponseBody;
+   try {
+   long attempt = 0;
+   do {
+   final 
CompletableFuture responseFuture =
+   restClient.sendRequest(
+   
restClusterClientConfiguration.getRestServerAddress(),
+   
restClusterClientConfiguration.getRestServerPort(),
+   
JobExecutionResultHeaders.getInstance(),
+   messageParameters);
+   jobExecutionResultResponseBody = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+   Thread.sleep(waitStrategy.sleepTime(attempt));
+   attempt++;
+   }
+   while 
(jobExecutionResultResponseBody.getStatus().getStatusId() != 
QueueStatus.StatusId.COMPLETED);
--- End diff --

Alright, then let's do it there.


> Retrieve ExecutionResult by REST polling
> 
>
> Key: FLINK-8299
> URL: https://issues.apache.org/jira/browse/FLINK-8299
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Retrieve the {{ExecutionResult}} from a finished Flink job via the 
> {{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r161010273
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws 
JobSubmissionException {
}
}
 
+   private JobExecutionResult waitForJobExecutionResult(
+   final JobID jobId) throws ProgramInvocationException {
+
+   final JobMessageParameters messageParameters = new 
JobMessageParameters();
+   messageParameters.jobPathParameter.resolve(jobId);
+   JobExecutionResultResponseBody jobExecutionResultResponseBody;
+   try {
+   long attempt = 0;
+   do {
+   final 
CompletableFuture responseFuture =
+   restClient.sendRequest(
+   
restClusterClientConfiguration.getRestServerAddress(),
+   
restClusterClientConfiguration.getRestServerPort(),
+   
JobExecutionResultHeaders.getInstance(),
+   messageParameters);
+   jobExecutionResultResponseBody = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+   Thread.sleep(waitStrategy.sleepTime(attempt));
+   attempt++;
+   }
+   while 
(jobExecutionResultResponseBody.getStatus().getStatusId() != 
QueueStatus.StatusId.COMPLETED);
--- End diff --

Alright, then let's do it there.


---


[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5194


---


[jira] [Commented] (FLINK-8233) Expose JobExecutionResult via HTTP

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322480#comment-16322480
 ] 

ASF GitHub Bot commented on FLINK-8233:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5194


> Expose JobExecutionResult via HTTP
> --
>
> Key: FLINK-8233
> URL: https://issues.apache.org/jira/browse/FLINK-8233
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Expose {{JobExecutionResult}} from a finished Flink job via HTTP:
> * Add a new AbstractRestHandler that returns the information in 
> {{JobExecutionResult}}.
> * Register new handler in {{WebMonitorEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322477#comment-16322477
 ] 

ASF GitHub Bot commented on FLINK-8328:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5215


> Pull Yarn ApplicationStatus polling out of YarnClusterClient
> 
>
> Key: FLINK-8328
> URL: https://issues.apache.org/jira/browse/FLINK-8328
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to 
> pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. 
> I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has 
> also the benefit of separating concerns better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8329) Move YarnClient out of YarnClusterClient

2018-01-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8329.

Resolution: Fixed

Fixed via 156b8935ef76eb53456cea1d40fd528ccefa21d8

> Move YarnClient out of YarnClusterClient
> 
>
> Key: FLINK-8329
> URL: https://issues.apache.org/jira/browse/FLINK-8329
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Move the {{YarnClient}} from the {{YarnClusterClient}} to the 
> {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle 
> management of the {{YarnClient}}. This change is a clean up task which will 
> better structure the client code.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8329) Move YarnClient out of YarnClusterClient

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322478#comment-16322478
 ] 

ASF GitHub Bot commented on FLINK-8329:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5216


> Move YarnClient out of YarnClusterClient
> 
>
> Key: FLINK-8329
> URL: https://issues.apache.org/jira/browse/FLINK-8329
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Move the {{YarnClient}} from the {{YarnClusterClient}} to the 
> {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle 
> management of the {{YarnClient}}. This change is a clean up task which will 
> better structure the client code.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient

2018-01-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8328.

Resolution: Fixed

Fixed via 2ce5b98da04cb3850ff91757cc4b74a98b8ce082

> Pull Yarn ApplicationStatus polling out of YarnClusterClient
> 
>
> Key: FLINK-8328
> URL: https://issues.apache.org/jira/browse/FLINK-8328
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to 
> pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. 
> I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has 
> also the benefit of separating concerns better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5216: [FLINK-8329] [flip6] Move YarnClient to AbstractYa...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5216


---


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5219


---


[jira] [Closed] (FLINK-8332) Move dispose savepoint into ClusterClient

2018-01-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8332.

Resolution: Fixed

Fixed via c2492e9b220c6c9a64b47bcdc76a2194d9f4d669

> Move dispose savepoint into ClusterClient
> -
>
> Key: FLINK-8332
> URL: https://issues.apache.org/jira/browse/FLINK-8332
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{CliFrontend}} sends the command for disposing a savepoint. 
> In order to better abstract this functionality we should move it to the 
> {{ClusterClient}}. That way we can have different implementations of the 
> {{ClusterClient}} (Flip-6 and old code) which are used by the same 
> {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8332) Move dispose savepoint into ClusterClient

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322479#comment-16322479
 ] 

ASF GitHub Bot commented on FLINK-8332:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5219


> Move dispose savepoint into ClusterClient
> -
>
> Key: FLINK-8332
> URL: https://issues.apache.org/jira/browse/FLINK-8332
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{CliFrontend}} sends the command for disposing a savepoint. 
> In order to better abstract this functionality we should move it to the 
> {{ClusterClient}}. That way we can have different implementations of the 
> {{ClusterClient}} (Flip-6 and old code) which are used by the same 
> {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5215


---


[jira] [Closed] (FLINK-8233) Expose JobExecutionResult via HTTP

2018-01-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8233.

Resolution: Fixed

Fixed via 86892b8e76a4e4b26cedf38c0695c53814a7f04f

> Expose JobExecutionResult via HTTP
> --
>
> Key: FLINK-8233
> URL: https://issues.apache.org/jira/browse/FLINK-8233
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Expose {{JobExecutionResult}} from a finished Flink job via HTTP:
> * Add a new AbstractRestHandler that returns the information in 
> {{JobExecutionResult}}.
> * Register new handler in {{WebMonitorEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2018-01-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r160997382
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws 
JobSubmissionException {
}
}
 
+   private JobExecutionResult waitForJobExecutionResult(
+   final JobID jobId) throws ProgramInvocationException {
+
+   final JobMessageParameters messageParameters = new 
JobMessageParameters();
+   messageParameters.jobPathParameter.resolve(jobId);
+   JobExecutionResultResponseBody jobExecutionResultResponseBody;
+   try {
+   long attempt = 0;
+   do {
+   final 
CompletableFuture responseFuture =
+   restClient.sendRequest(
+   
restClusterClientConfiguration.getRestServerAddress(),
+   
restClusterClientConfiguration.getRestServerPort(),
+   
JobExecutionResultHeaders.getInstance(),
+   messageParameters);
+   jobExecutionResultResponseBody = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+   Thread.sleep(waitStrategy.sleepTime(attempt));
+   attempt++;
+   }
+   while 
(jobExecutionResultResponseBody.getStatus().getStatusId() != 
QueueStatus.StatusId.COMPLETED);
--- End diff --

Can change it to `getStatus().getId()` to avoid redundancy. However, the 
code will be touched once more in #5223. I can do it there.


---


[jira] [Commented] (FLINK-8299) Retrieve ExecutionResult by REST polling

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322467#comment-16322467
 ] 

ASF GitHub Bot commented on FLINK-8299:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5207#discussion_r160997382
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -148,6 +181,39 @@ private void submitJob(JobGraph jobGraph) throws 
JobSubmissionException {
}
}
 
+   private JobExecutionResult waitForJobExecutionResult(
+   final JobID jobId) throws ProgramInvocationException {
+
+   final JobMessageParameters messageParameters = new 
JobMessageParameters();
+   messageParameters.jobPathParameter.resolve(jobId);
+   JobExecutionResultResponseBody jobExecutionResultResponseBody;
+   try {
+   long attempt = 0;
+   do {
+   final 
CompletableFuture responseFuture =
+   restClient.sendRequest(
+   
restClusterClientConfiguration.getRestServerAddress(),
+   
restClusterClientConfiguration.getRestServerPort(),
+   
JobExecutionResultHeaders.getInstance(),
+   messageParameters);
+   jobExecutionResultResponseBody = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+   Thread.sleep(waitStrategy.sleepTime(attempt));
+   attempt++;
+   }
+   while 
(jobExecutionResultResponseBody.getStatus().getStatusId() != 
QueueStatus.StatusId.COMPLETED);
--- End diff --

Can change it to `getStatus().getId()` to avoid redundancy. However, the 
code will be touched once more in #5223. I can do it there.


> Retrieve ExecutionResult by REST polling
> 
>
> Key: FLINK-8299
> URL: https://issues.apache.org/jira/browse/FLINK-8299
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Retrieve the {{ExecutionResult}} from a finished Flink job via the 
> {{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5225: [FLINK-8339] [flip6] Let CustomCommandLine return ...

2018-01-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160995333
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---
@@ -51,45 +45,23 @@ public String getId() {
}
 
@Override
-   public void addRunOptions(Options baseOptions) {
-   }
+   public ClusterDescriptor 
createClusterDescriptor(
+   Configuration configuration,
+   String configurationDirectory,
+   CommandLine commandLine) {
+   final Configuration effectiveConfiguration = 
applyCommandLineOptionsToConfiguration(configuration, commandLine);
 
-   @Override
-   public void addGeneralOptions(Options baseOptions) {
+   return new StandaloneClusterDescriptor(effectiveConfiguration);
}
 
@Override
-   public StandaloneClusterClient retrieveCluster(
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory) {
-
-   if 
(commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
-   String addressWithPort = 
commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
-   InetSocketAddress jobManagerAddress = 
ClientUtils.parseHostPortAddress(addressWithPort);
-   setJobManagerAddressInConfig(config, jobManagerAddress);
-   }
-
-   if 
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
-   String zkNamespace = 
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
-   config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
-   }
-
-   StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
-   return descriptor.retrieve(null);
+   @Nullable
+   public String getClusterId(Configuration configuration, CommandLine 
commandLine) {
+   return "standalone";
}
 
@Override
-   public StandaloneClusterClient createCluster(
-   String applicationName,
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory,
-   List userJarFiles) throws 
UnsupportedOperationException {
-
-   StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
-   ClusterSpecification clusterSpecification = 
ClusterSpecification.fromConfiguration(config);
-
-   return descriptor.deploySessionCluster(clusterSpecification);
+   public ClusterSpecification getClusterSpecification(Configuration 
configuration, CommandLine commandLine) {
+   return new 
ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
--- End diff --

ok


---


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322459#comment-16322459
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160995333
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---
@@ -51,45 +45,23 @@ public String getId() {
}
 
@Override
-   public void addRunOptions(Options baseOptions) {
-   }
+   public ClusterDescriptor 
createClusterDescriptor(
+   Configuration configuration,
+   String configurationDirectory,
+   CommandLine commandLine) {
+   final Configuration effectiveConfiguration = 
applyCommandLineOptionsToConfiguration(configuration, commandLine);
 
-   @Override
-   public void addGeneralOptions(Options baseOptions) {
+   return new StandaloneClusterDescriptor(effectiveConfiguration);
}
 
@Override
-   public StandaloneClusterClient retrieveCluster(
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory) {
-
-   if 
(commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
-   String addressWithPort = 
commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
-   InetSocketAddress jobManagerAddress = 
ClientUtils.parseHostPortAddress(addressWithPort);
-   setJobManagerAddressInConfig(config, jobManagerAddress);
-   }
-
-   if 
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
-   String zkNamespace = 
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
-   config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
-   }
-
-   StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
-   return descriptor.retrieve(null);
+   @Nullable
+   public String getClusterId(Configuration configuration, CommandLine 
commandLine) {
+   return "standalone";
}
 
@Override
-   public StandaloneClusterClient createCluster(
-   String applicationName,
-   CommandLine commandLine,
-   Configuration config,
-   String configurationDirectory,
-   List userJarFiles) throws 
UnsupportedOperationException {
-
-   StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);
-   ClusterSpecification clusterSpecification = 
ClusterSpecification.fromConfiguration(config);
-
-   return descriptor.deploySessionCluster(clusterSpecification);
+   public ClusterSpecification getClusterSpecification(Configuration 
configuration, CommandLine commandLine) {
+   return new 
ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
--- End diff --

ok


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8233) Expose JobExecutionResult via HTTP

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322452#comment-16322452
 ] 

ASF GitHub Bot commented on FLINK-8233:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5194
  
Travis passed. Merging this PR. Thanks for your contribution.


> Expose JobExecutionResult via HTTP
> --
>
> Key: FLINK-8233
> URL: https://issues.apache.org/jira/browse/FLINK-8233
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Expose {{JobExecutionResult}} from a finished Flink job via HTTP:
> * Add a new AbstractRestHandler that returns the information in 
> {{JobExecutionResult}}.
> * Register new handler in {{WebMonitorEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322453#comment-16322453
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160994352
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception {
 
final CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(commandLine);
 
-   final ClusterClient client = 
activeCommandLine.retrieveCluster(commandLine, configuration, 
configurationDirectory);
+   final ClusterDescriptor clusterDescriptor = 
activeCommandLine.createClusterDescriptor(
--- End diff --

ok


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5225: [FLINK-8339] [flip6] Let CustomCommandLine return ...

2018-01-11 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160994352
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception {
 
final CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(commandLine);
 
-   final ClusterClient client = 
activeCommandLine.retrieveCluster(commandLine, configuration, 
configurationDirectory);
+   final ClusterDescriptor clusterDescriptor = 
activeCommandLine.createClusterDescriptor(
--- End diff --

ok


---


[GitHub] flink issue #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5194
  
Travis passed. Merging this PR. Thanks for your contribution.


---


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322425#comment-16322425
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5225
  
Thanks for the review @GJL. I've addressed most of your comments and 
rebased onto the latest master.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5225: [FLINK-8339] [flip6] Let CustomCommandLine return Cluster...

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5225
  
Thanks for the review @GJL. I've addressed most of your comments and 
rebased onto the latest master.


---


[jira] [Commented] (FLINK-6892) Add L/RPAD supported in SQL

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322397#comment-16322397
 ] 

ASF GitHub Bot commented on FLINK-6892:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4127
  
@twalthr Thanks for the review! I have update the PR. I will be very 
grateful if you can review again.

Thanks, Jincheng


> Add L/RPAD supported in SQL
> ---
>
> Key: FLINK-6892
> URL: https://issues.apache.org/jira/browse/FLINK-6892
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> L/RPAD(str,len,padstr) Returns the string str, left/right-padded with the 
> string padstr to a length of len characters. If str is longer than len, the 
> return value is shortened to len characters.
> * Syntax:
> LPAD(str,len,padstr) 
> * Arguments
> **str: -
> **len: -
> **padstr: -
> * Return Types
>   String
> * Example:
>   LPAD('hi',4,'??') -> '??hi'
>   LPAD('hi',1,'??') -> 'h'
>   RPAD('hi',4,'??') -> 'hi??'
>   RPAD('hi',1,'??') -> 'h'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_lpad]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4127: [FLINK-6892][table]Add L/RPAD supported in SQL

2018-01-11 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4127
  
@twalthr Thanks for the review! I have update the PR. I will be very 
grateful if you can review again.

Thanks, Jincheng


---


[jira] [Commented] (FLINK-8339) Let CustomCommandLine return a ClusterDescriptor

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322396#comment-16322396
 ] 

ASF GitHub Bot commented on FLINK-8339:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160985701
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception {
 
final CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(commandLine);
 
-   final ClusterClient client = 
activeCommandLine.retrieveCluster(commandLine, configuration, 
configurationDirectory);
+   final ClusterDescriptor clusterDescriptor = 
activeCommandLine.createClusterDescriptor(
--- End diff --

You're right. I would like to postpone addressing this issue until we've 
introduce the typed cluster id. Otherwise I fear that it would inflict quite 
some merge conflicts.


> Let CustomCommandLine return a ClusterDescriptor
> 
>
> Key: FLINK-8339
> URL: https://issues.apache.org/jira/browse/FLINK-8339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{CustomCommandLine}} currently is able to retrieve a {{ClusterClient}} 
> and deploy a cluster. In order to better separate concerns it would be good 
> if the {{CustomCommandLine}} would simply return a {{ClusterDescriptor}} 
> which could then be used to retrieve a {{ClusterClient}} or to deploy a Flink 
> cluster. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5225: [FLINK-8339] [flip6] Let CustomCommandLine return ...

2018-01-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5225#discussion_r160985701
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -457,7 +499,19 @@ protected int stop(String[] args) throws Exception {
 
final CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(commandLine);
 
-   final ClusterClient client = 
activeCommandLine.retrieveCluster(commandLine, configuration, 
configurationDirectory);
+   final ClusterDescriptor clusterDescriptor = 
activeCommandLine.createClusterDescriptor(
--- End diff --

You're right. I would like to postpone addressing this issue until we've 
introduce the typed cluster id. Otherwise I fear that it would inflict quite 
some merge conflicts.


---


  1   2   3   >