[jira] [Commented] (FLINK-8022) Kafka at-least-once tests fail occasionally

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266685#comment-16266685
 ] 

ASF GitHub Bot commented on FLINK-8022:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5078

[FLINK-8022][kafka] Bump at-least-once timeout in tests

Increasing timeout for reading the records from 30s to 60s seems to solve 
the issue
for failing at-least-one tests.

This is a minor fix in tests that intends to increase tests stability.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8022

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5078.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5078


commit 31a0f9ecb6f39210e2b7dacee9bcf90f44ceee1f
Author: Piotr Nowojski 
Date:   2017-11-23T11:53:37Z

[FLINK-8022][kafka] Bump at-least-once timeout in tests

Increaseing timeout for reading the records from 30s to 60s seems to solve 
the issue
for failing at-least-one tests.




> Kafka at-least-once tests fail occasionally
> ---
>
> Key: FLINK-8022
> URL: https://issues.apache.org/jira/browse/FLINK-8022
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Nico Kruber
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}}
>  seems to sporadically fail with missing data, like this execution:
> {code}
> 
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  is running.
> 
> 17:54:30,195 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Creating topic oneToOneTopicRegularSink
> 17:54:30,196 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - In 
> getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436
> 17:54:30,204 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Starting ZkClient event thread.
> 17:54:30,204 INFO  org.I0Itec.zkclient.ZkClient   
>- Waiting for keeper state SyncConnected
> 17:54:30,240 INFO  org.I0Itec.zkclient.ZkClient   
>- zookeeper state changed (SyncConnected)
> 17:54:30,261 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Terminate ZkClient event thread.
> 17:54:30,265 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - Topic 
> oneToOneTopicRegularSink create request is successfully posted
> 17:54:30,366 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Validating if the topic oneToOneTopicRegularSink has been created or not
> 17:54:30,373 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - In 
> getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436
> 17:54:30,374 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Starting ZkClient event thread.
> 17:54:30,374 INFO  org.I0Itec.zkclient.ZkClient   
>- Waiting for keeper state SyncConnected
> 17:54:30,404 INFO  org.I0Itec.zkclient.ZkClient   
>- zookeeper state changed (SyncConnected)
> 17:54:30,420 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - topic 
> oneToOneTopicRegularSink has been created successfully
> 17:54:30,421 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Terminate ZkClient event thread.
> 17:54:31,099 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - 
> Starting FlinkKafkaProducer (1/1) to produce into default topic 
> oneToOneTopicRegularSink
> 17:55:05,229 ERROR 
> org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase  - 
> 
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  failed with:
> java.lang.AssertionError: Expected to contain all of: <[0, 1, 2, 3, 4, 5, 6, 
> 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 
> 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 
> 46, 47, 

[GitHub] flink issue #4594: [FLINK-7517][network] let NettyBufferPool extend PooledBy...

2017-11-27 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4594
  
Yes @zhijiangW, I kept the original logic.
I guess, the reasoning behind using off-heap netty buffers only was to 
reduce the overhead before transmitting messages over the wire: 1) we reduce GC 
overhead somewhat and 2) at some point we need the memory to be off-heap and 
put into kernel space anyway - depending on netty, this may be optimised if it 
is already off-heap.

Also, starting with #4481 we will only be using off-heap network buffers 
anyway.


---


[jira] [Commented] (FLINK-7517) let NettyBufferPool extend PooledByteBufAllocator

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266688#comment-16266688
 ] 

ASF GitHub Bot commented on FLINK-7517:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4594
  
Yes @zhijiangW, I kept the original logic.
I guess, the reasoning behind using off-heap netty buffers only was to 
reduce the overhead before transmitting messages over the wire: 1) we reduce GC 
overhead somewhat and 2) at some point we need the memory to be off-heap and 
put into kernel space anyway - depending on netty, this may be optimised if it 
is already off-heap.

Also, starting with #4481 we will only be using off-heap network buffers 
anyway.


> let NettyBufferPool extend PooledByteBufAllocator
> -
>
> Key: FLINK-7517
> URL: https://issues.apache.org/jira/browse/FLINK-7517
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> {{NettyBufferPool}} wraps {{PooledByteBufAllocator}} but due to this, any 
> allocated buffer's {{alloc()}} method is returning the wrapped 
> {{PooledByteBufAllocator}} which allowed heap buffers again. By extending the 
> {{PooledByteBufAllocator}}, we prevent this loop hole and also fix the 
> invariant that a copy of a buffer should have the same allocator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7530) Port existing REST handlers to support Flip-6 components

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266744#comment-16266744
 ] 

ASF GitHub Bot commented on FLINK-7530:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5079

[FLINK-7530][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint

## What is the purpose of the change

*FLIP-6 efforts: Migrating HTTP handlers*

## Brief change log
  - Migrate logic from 
`org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to` 
new handler.
  - Add new handler to `DispatcherRestEndpoint`.

## Verifying this change

  - *Added unit tests for all new classes and modified existing classes 
except for DispatcherRestEndpoint.*
  - *Manually deployed a job locally and verified with `curl` that 
SubtaskMetrics can be queried in FLIP-6 standalone mode.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7530

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5079.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5079


commit 24d2ab0a600d476142cfadd453074b01506ba591
Author: gyao 
Date:   2017-11-27T12:30:18Z

[FLINK-7530][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint

Migrate logic from
org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler 
to
new handler. Add new handler to DispatcherRestEndpoint.

commit 2ef55c4d4e94d287f5e8b924e35c79bf53ab957a
Author: gyao 
Date:   2017-11-27T12:32:52Z

[hotfix][Javadoc] Remove wrong Javadoc from SubtaskMetricsHandler

commit b95b364640b43ba90f62d8c82a6d6c7f93e4ce65
Author: gyao 
Date:   2017-11-27T12:34:02Z

[hotfix][flip6] Add unit tests JobVertexMetricsHeadersTest




> Port existing REST handlers to support Flip-6 components
> 
>
> Key: FLINK-7530
> URL: https://issues.apache.org/jira/browse/FLINK-7530
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> This is the umbrella issue for porting the existing REST handlers to work 
> together with the new {{RestServerEndpoint}} and the {{AbstractRestHandler}}. 
> This is the requirement to make them work with the Flip-6 {{Dispatcher}} and 
> {{JobMaster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5079: [FLINK-7530][flip6] Migrate SubtaskMetricsHandler ...

2017-11-27 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5079

[FLINK-7530][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint

## What is the purpose of the change

*FLIP-6 efforts: Migrating HTTP handlers*

## Brief change log
  - Migrate logic from 
`org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to` 
new handler.
  - Add new handler to `DispatcherRestEndpoint`.

## Verifying this change

  - *Added unit tests for all new classes and modified existing classes 
except for DispatcherRestEndpoint.*
  - *Manually deployed a job locally and verified with `curl` that 
SubtaskMetrics can be queried in FLIP-6 standalone mode.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7530

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5079.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5079


commit 24d2ab0a600d476142cfadd453074b01506ba591
Author: gyao 
Date:   2017-11-27T12:30:18Z

[FLINK-7530][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint

Migrate logic from
org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler 
to
new handler. Add new handler to DispatcherRestEndpoint.

commit 2ef55c4d4e94d287f5e8b924e35c79bf53ab957a
Author: gyao 
Date:   2017-11-27T12:32:52Z

[hotfix][Javadoc] Remove wrong Javadoc from SubtaskMetricsHandler

commit b95b364640b43ba90f62d8c82a6d6c7f93e4ce65
Author: gyao 
Date:   2017-11-27T12:34:02Z

[hotfix][flip6] Add unit tests JobVertexMetricsHeadersTest




---


[jira] [Created] (FLINK-8159) Add rich support for SelectWrapper and FlatSelectWrapper

2017-11-27 Thread Dian Fu (JIRA)
Dian Fu created FLINK-8159:
--

 Summary: Add rich support for SelectWrapper and FlatSelectWrapper
 Key: FLINK-8159
 URL: https://issues.apache.org/jira/browse/FLINK-8159
 Project: Flink
  Issue Type: Sub-task
  Components: CEP
Reporter: Dian Fu
Assignee: Dian Fu


{{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
{{AbstractRichFucntion}} and process properly if the underlying functions 
extend RichFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8158) Rowtime window inner join emits late data

2017-11-27 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-8158:
--

 Summary: Rowtime window inner join emits late data
 Key: FLINK-8158
 URL: https://issues.apache.org/jira/browse/FLINK-8158
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Hequn Cheng
Assignee: Hequn Cheng


When executing the join, the join operator needs to make sure that no late data 
is emitted. Currently, this achieved by holding back watermarks. However, the 
window border is not handled correctly. For the sql bellow: 
{quote}
val sqlQuery =
  """
|SELECT t2.key, t2.id, t1.id
|FROM T1 as t1 join T2 as t2 ON
|  t1.key = t2.key AND
|  t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
|t2.rt + INTERVAL '1' SECOND
|""".stripMargin

val data1 = new mutable.MutableList[(String, String, Long)]
// for boundary test
data1.+=(("A", "LEFT1", 6000L))

val data2 = new mutable.MutableList[(String, String, Long)]
data2.+=(("A", "RIGHT1", 6000L))
{quote}

Join will output a watermark with timestamp 1000, but if left comes with 
another data ("A", "LEFT1", 1000L), join will output a record with timestamp 
1000 which equals previous watermark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5082: [FLINK-8143][flip6] Migrate SubtaskMetricsHandler ...

2017-11-27 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5082

[FLINK-8143][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint

## What is the purpose of the change

*FLIP-6 efforts: Migrating HTTP handlers*

## Brief change log
  - Migrate logic from 
`org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to` 
new handler.
  - Add new handler to `DispatcherRestEndpoint`.

## Verifying this change

  - *Added unit tests for all new classes and modified existing classes 
except for DispatcherRestEndpoint.*
  - *Manually deployed a job locally and verified with `curl` that 
SubtaskMetrics can be queried in FLIP-6 standalone mode.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8143

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5082.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5082


commit d32d978e1826df6fab8f5b4a27d47b4367d54ece
Author: gyao 
Date:   2017-11-27T12:30:18Z

[FLINK-8150][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint

Migrate logic from
org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler 
to
new handler. Add new handler to DispatcherRestEndpoint.

commit 96c9f671d6320d5d1d014d5df27f4a3a07417993
Author: gyao 
Date:   2017-11-27T12:32:52Z

[hotfix][Javadoc] Remove wrong Javadoc from SubtaskMetricsHandler

commit c369d733fecc71a7581b5a6cbf196b35ecdfed12
Author: gyao 
Date:   2017-11-27T12:34:02Z

[hotfix][flip6] Add unit tests JobVertexMetricsHeadersTest




---


[GitHub] flink pull request #5079: [FLINK-8143][flip6] Migrate SubtaskMetricsHandler ...

2017-11-27 Thread GJL
Github user GJL closed the pull request at:

https://github.com/apache/flink/pull/5079


---


[GitHub] flink issue #5049: [FLINK-8081][metrics] Annotate 'MetricRegistry#getReporte...

2017-11-27 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5049
  
merging.


---


[jira] [Commented] (FLINK-8081) Annotate MetricRegistry#getReporters() with @VisibleForTesting

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266693#comment-16266693
 ] 

ASF GitHub Bot commented on FLINK-8081:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5049
  
merging.


> Annotate MetricRegistry#getReporters() with @VisibleForTesting
> --
>
> Key: FLINK-8081
> URL: https://issues.apache.org/jira/browse/FLINK-8081
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> {{MetricRegistry#getReporters()}} is only used for testing purposes to 
> provide access to instantiated reporters. We should annotate this method with 
> {{@VisibleForTesting}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5068: [FLINK-8122] [table] Name all built-in table sinks...

2017-11-27 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5068#discussion_r153179456
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -60,4 +60,14 @@ trait TableSource[T] {
 */
   def explainSource(): String = ""
 
+  /**
+* Gets the name which is used by the visualization and logging during 
runtime.
+*
+* @return Name of the [[TableSource]].
+*/
+  def getRuntimeName(): String = {
--- End diff --

Hi @fhueske, thanks for the suggestion. I'll remove this method and add a 
default implementation for `explainSource()`.


---


[jira] [Commented] (FLINK-8122) Name all table sinks and sources

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266715#comment-16266715
 ] 

ASF GitHub Bot commented on FLINK-8122:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5068#discussion_r153179456
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -60,4 +60,14 @@ trait TableSource[T] {
 */
   def explainSource(): String = ""
 
+  /**
+* Gets the name which is used by the visualization and logging during 
runtime.
+*
+* @return Name of the [[TableSource]].
+*/
+  def getRuntimeName(): String = {
--- End diff --

Hi @fhueske, thanks for the suggestion. I'll remove this method and add a 
default implementation for `explainSource()`.


> Name all table sinks and sources
> 
>
> Key: FLINK-8122
> URL: https://issues.apache.org/jira/browse/FLINK-8122
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Not all table sink and sources have proper names. Therefore, they are 
> displayed as "Unnamed" in the logs and Web UI (e.g. CsvTableSink). We should 
> add names for all built-in connectors. Having information about the table 
> sink name (via {{INSERT INTO}}) would be even better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5081: [FLINK-7530][flip6] Migrate TaskManagerMetricsHand...

2017-11-27 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5081

[FLINK-7530][flip6] Migrate TaskManagerMetricsHandler to new 
RestServerEndpoint

## What is the purpose of the change

*FLIP-6 efforts: Migrating HTTP handlers*

## Brief change log
  - Migrate logic from 
`org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler 
to` new handler.
  - Add new handler to `DispatcherRestEndpoint`.

## Verifying this change

  - *Added unit tests for all new classes and modified existing classes 
except for DispatcherRestEndpoint.*
  - *Manually deployed a job locally and verified with `curl` that 
TaskManager metrics can be queried in FLIP-6 standalone mode. Note that the 
task manager ids exposed by the WebUI in FLIP-6 mode are currently broken 
(https://issues.apache.org/jira/browse/FLINK-8150). I had to obtain valid ids 
by setting a breakpoint in `ResourceManager`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7530-tm-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5081.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5081


commit 71ee76139e0703bea74623353c7175533582aa16
Author: gyao 
Date:   2017-11-27T12:57:48Z

[FLINK-7530][flip6] Migrate TaskManagerMetricsHandler to new 
RestServerEndpoint

Migrate logic in

org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler
to new handler, and add new handler to DispatcherRestEndpoint.




---


[jira] [Commented] (FLINK-8154) JobSubmissionClientActor submited job,but there is no connection to a JobManager

2017-11-27 Thread Patrick Lucas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266782#comment-16266782
 ] 

Patrick Lucas commented on FLINK-8154:
--

Is `act-monitor-flink-jobmanager` a Kubernetes Service? If so, it's looking 
like the Service's IP is `10.3.0.81`, but the Pod's IP is `10.2.43.51`. 
Depending on your network configuration, these may not be routable to each 
other.

Can you confirm connectivity by, for example, running `nc -v 10.2.43.51 6123` 
from within the Pod? (You may have to install `netcat`)

> JobSubmissionClientActor  submited job,but there is no connection to a 
> JobManager
> -
>
> Key: FLINK-8154
> URL: https://issues.apache.org/jira/browse/FLINK-8154
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.2
> Environment: Kubernetes 1.8.3, Platform "Linux/amd64"
>Reporter: Gregory Melekh
>Priority: Blocker
>
> There is JobManager log file bellow.
> 2017-11-26 08:17:13,435 INFO  org.apache.flink.client.CliFrontend 
>   - 
> 
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  Starting Command Line Client (Version: 1.3.2, Rev:0399bee, 
> Date:03.08.2017 @ 10:23:11 UTC)
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  Current user: root
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 
> 1.8/25.131-b11
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  Maximum heap size: 6252 MiBytes
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre
> 2017-11-26 08:17:13,439 INFO  org.apache.flink.client.CliFrontend 
>   -  Hadoop version: 2.7.2
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   -  JVM Options:
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - 
> -Dlog.file=/opt/flink/log/flink--client-act-monitor-flink-jobmanager-66cd4bdb5c-8kxbh.log
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - -Dlog4j.configuration=file:/etc/flink/log4j-cli.properties
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - -Dlogback.configurationFile=file:/etc/flink/logback.xml
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   -  Program Arguments:
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - run
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - -c
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - monitoring.flow.AccumulateAll
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - /tmp/monitoring-0.0.1-SNAPSHOT.jar
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   -  Classpath: 
> /opt/flink/lib/flink-python_2.11-1.3.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.3.2.jar:::
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - 
> 
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - Using configuration directory /etc/flink
> 2017-11-26 08:17:13,441 INFO  org.apache.flink.client.CliFrontend 
>   - Trying to load configuration file
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: blob.server.port, 6124
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.address, act-monitor-flink-jobmanager
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2017-11-26 08:17:13,443 INFO  
> 

[GitHub] flink issue #5032: [FLINK-8090] [DataStream] Improve the error message for d...

2017-11-27 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5032
  
Thanks for the suggestion @aljoscha. 

The problem is the state type is provided via a generic type parameter `S 
extends State`, which will be erased in runtime. Thus it's hard to do type 
checking in `AbstractKeyedStateBackend` unless we explicitly store and check 
the type for each state name (and that may affect the performance). The 
existing "leave alone" solution seems to be the most efficient way, but we can 
only get a `ClassCastException` with that. What do you think?


---


[GitHub] flink pull request #5078: [FLINK-8022][kafka] Bump at-least-once timeout in ...

2017-11-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5078


---


[jira] [Commented] (FLINK-8022) Kafka at-least-once tests fail occasionally

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266694#comment-16266694
 ] 

ASF GitHub Bot commented on FLINK-8022:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5078
  
Thanks! I merged this  



> Kafka at-least-once tests fail occasionally
> ---
>
> Key: FLINK-8022
> URL: https://issues.apache.org/jira/browse/FLINK-8022
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Nico Kruber
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}}
>  seems to sporadically fail with missing data, like this execution:
> {code}
> 
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  is running.
> 
> 17:54:30,195 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Creating topic oneToOneTopicRegularSink
> 17:54:30,196 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - In 
> getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436
> 17:54:30,204 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Starting ZkClient event thread.
> 17:54:30,204 INFO  org.I0Itec.zkclient.ZkClient   
>- Waiting for keeper state SyncConnected
> 17:54:30,240 INFO  org.I0Itec.zkclient.ZkClient   
>- zookeeper state changed (SyncConnected)
> 17:54:30,261 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Terminate ZkClient event thread.
> 17:54:30,265 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - Topic 
> oneToOneTopicRegularSink create request is successfully posted
> 17:54:30,366 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Validating if the topic oneToOneTopicRegularSink has been created or not
> 17:54:30,373 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - In 
> getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436
> 17:54:30,374 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Starting ZkClient event thread.
> 17:54:30,374 INFO  org.I0Itec.zkclient.ZkClient   
>- Waiting for keeper state SyncConnected
> 17:54:30,404 INFO  org.I0Itec.zkclient.ZkClient   
>- zookeeper state changed (SyncConnected)
> 17:54:30,420 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - topic 
> oneToOneTopicRegularSink has been created successfully
> 17:54:30,421 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Terminate ZkClient event thread.
> 17:54:31,099 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - 
> Starting FlinkKafkaProducer (1/1) to produce into default topic 
> oneToOneTopicRegularSink
> 17:55:05,229 ERROR 
> org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase  - 
> 
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  failed with:
> java.lang.AssertionError: Expected to contain all of: <[0, 1, 2, 3, 4, 5, 6, 
> 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 
> 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 
> 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 
> 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 
> 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 
> 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 
> 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 
> 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 
> 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 
> 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 
> 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 
> 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 
> 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 
> 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 
> 237, 238, 239, 240, 241, 242, 243, 

[jira] [Closed] (FLINK-8022) Kafka at-least-once tests fail occasionally

2017-11-27 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-8022.
---
Resolution: Fixed
  Assignee: Piotr Nowojski  (was: Tzu-Li (Gordon) Tai)

Fixed on master in
da38a219a9abf31f53318b2a902bd064bfd0a775

Fixed on release-1.4 in
4a46507e980b880a20bd821ee55d51bc787df124

> Kafka at-least-once tests fail occasionally
> ---
>
> Key: FLINK-8022
> URL: https://issues.apache.org/jira/browse/FLINK-8022
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}}
>  seems to sporadically fail with missing data, like this execution:
> {code}
> 
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  is running.
> 
> 17:54:30,195 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Creating topic oneToOneTopicRegularSink
> 17:54:30,196 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - In 
> getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436
> 17:54:30,204 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Starting ZkClient event thread.
> 17:54:30,204 INFO  org.I0Itec.zkclient.ZkClient   
>- Waiting for keeper state SyncConnected
> 17:54:30,240 INFO  org.I0Itec.zkclient.ZkClient   
>- zookeeper state changed (SyncConnected)
> 17:54:30,261 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Terminate ZkClient event thread.
> 17:54:30,265 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - Topic 
> oneToOneTopicRegularSink create request is successfully posted
> 17:54:30,366 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Validating if the topic oneToOneTopicRegularSink has been created or not
> 17:54:30,373 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - In 
> getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436
> 17:54:30,374 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Starting ZkClient event thread.
> 17:54:30,374 INFO  org.I0Itec.zkclient.ZkClient   
>- Waiting for keeper state SyncConnected
> 17:54:30,404 INFO  org.I0Itec.zkclient.ZkClient   
>- zookeeper state changed (SyncConnected)
> 17:54:30,420 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - topic 
> oneToOneTopicRegularSink has been created successfully
> 17:54:30,421 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Terminate ZkClient event thread.
> 17:54:31,099 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - 
> Starting FlinkKafkaProducer (1/1) to produce into default topic 
> oneToOneTopicRegularSink
> 17:55:05,229 ERROR 
> org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase  - 
> 
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  failed with:
> java.lang.AssertionError: Expected to contain all of: <[0, 1, 2, 3, 4, 5, 6, 
> 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 
> 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 
> 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 
> 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 
> 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 
> 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 
> 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 
> 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 
> 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 
> 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 
> 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 
> 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 
> 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 
> 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 

[jira] [Commented] (FLINK-8022) Kafka at-least-once tests fail occasionally

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266696#comment-16266696
 ] 

ASF GitHub Bot commented on FLINK-8022:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5078


> Kafka at-least-once tests fail occasionally
> ---
>
> Key: FLINK-8022
> URL: https://issues.apache.org/jira/browse/FLINK-8022
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}}
>  seems to sporadically fail with missing data, like this execution:
> {code}
> 
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  is running.
> 
> 17:54:30,195 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Creating topic oneToOneTopicRegularSink
> 17:54:30,196 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - In 
> getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436
> 17:54:30,204 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Starting ZkClient event thread.
> 17:54:30,204 INFO  org.I0Itec.zkclient.ZkClient   
>- Waiting for keeper state SyncConnected
> 17:54:30,240 INFO  org.I0Itec.zkclient.ZkClient   
>- zookeeper state changed (SyncConnected)
> 17:54:30,261 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Terminate ZkClient event thread.
> 17:54:30,265 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - Topic 
> oneToOneTopicRegularSink create request is successfully posted
> 17:54:30,366 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Validating if the topic oneToOneTopicRegularSink has been created or not
> 17:54:30,373 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - In 
> getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436
> 17:54:30,374 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Starting ZkClient event thread.
> 17:54:30,374 INFO  org.I0Itec.zkclient.ZkClient   
>- Waiting for keeper state SyncConnected
> 17:54:30,404 INFO  org.I0Itec.zkclient.ZkClient   
>- zookeeper state changed (SyncConnected)
> 17:54:30,420 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - topic 
> oneToOneTopicRegularSink has been created successfully
> 17:54:30,421 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Terminate ZkClient event thread.
> 17:54:31,099 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - 
> Starting FlinkKafkaProducer (1/1) to produce into default topic 
> oneToOneTopicRegularSink
> 17:55:05,229 ERROR 
> org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase  - 
> 
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  failed with:
> java.lang.AssertionError: Expected to contain all of: <[0, 1, 2, 3, 4, 5, 6, 
> 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 
> 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 
> 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 
> 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 
> 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 
> 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 
> 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 
> 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 
> 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 
> 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 
> 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 
> 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 
> 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 
> 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 
> 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 

[GitHub] flink issue #5078: [FLINK-8022][kafka] Bump at-least-once timeout in tests

2017-11-27 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5078
  
Thanks! I merged this 👍 



---


[jira] [Closed] (FLINK-8153) Upgrade to JDK 9

2017-11-27 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8153.
---
Resolution: Later

There are no plans at the moment to fully switch to JDK 9, that is dropping 
java 8. We are however working on making Flink build/run on java 9, see 
FLINK-8033.

> Upgrade to JDK 9
> 
>
> Key: FLINK-8153
> URL: https://issues.apache.org/jira/browse/FLINK-8153
> Project: Flink
>  Issue Type: Improvement
> Environment: Development
>Reporter: Paul Meshkovsky
>Priority: Minor
>
> Guys any plans to upgrade to JDK 9? ... Maybe it should be considered as part 
> of Flink 2.0 release... Thanks... Example some of old connector technologies 
> can be dropped I think Kafka 1.0 client should be compatible with lower 
> versions of Kafka.. I am new to flink so just asking



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8160) Extend OperatorHarness to expose metrics

2017-11-27 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-8160:
---

 Summary: Extend OperatorHarness to expose metrics
 Key: FLINK-8160
 URL: https://issues.apache.org/jira/browse/FLINK-8160
 Project: Flink
  Issue Type: Improvement
  Components: Metrics, Streaming
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.5.0


To better test interactions between operators and metrics the harness should 
expose the metrics registered by the operator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-5789) Make Bucketing Sink independent of Hadoop's FileSystem

2017-11-27 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-5789:
---

Assignee: (was: Gary Yao)

> Make Bucketing Sink independent of Hadoop's FileSystem
> --
>
> Key: FLINK-5789
> URL: https://issues.apache.org/jira/browse/FLINK-5789
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0, 1.1.4
>Reporter: Stephan Ewen
> Fix For: 1.5.0
>
>
> The {{BucketingSink}} is hard wired to Hadoop's FileSystem, bypassing Flink's 
> file system abstraction.
> This causes several issues:
>   - The bucketing sink will behave different than other file sinks with 
> respect to configuration
>   - Directly supported file systems (not through hadoop) like the MapR File 
> System does not work in the same way with the BuketingSink as other file 
> systems
>   - The previous point is all the more problematic in the effort to make 
> Hadoop an optional dependency and with in other stacks (Mesos, Kubernetes, 
> AWS, GCE, Azure) with ideally no Hadoop dependency.
> We should port the {{BucketingSink}} to use Flink's FileSystem classes.
> To support the *truncate* functionality that is needed for the exactly-once 
> semantics of the Bucketing Sink, we should extend Flink's FileSystem 
> abstraction to have the methods
>   - {{boolean supportsTruncate()}}
>   - {{void truncate(Path, long)}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5032: [FLINK-8090] [DataStream] Improve the error message for d...

2017-11-27 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5032
  
We could check based on the state descriptor.


---


[GitHub] flink pull request #5049: [FLINK-8081][metrics] Annotate 'MetricRegistry#get...

2017-11-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5049


---


[jira] [Closed] (FLINK-8081) Annotate MetricRegistry#getReporters() with @VisibleForTesting

2017-11-27 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8081.
---
   Resolution: Fixed
Fix Version/s: 1.4.1
   1.5.0

1.4: bd1e03374d0f240f5a0b406ace662e4391cce1d5
master: fbddf35faf59d5b762a71d42155852b29ef0da48

> Annotate MetricRegistry#getReporters() with @VisibleForTesting
> --
>
> Key: FLINK-8081
> URL: https://issues.apache.org/jira/browse/FLINK-8081
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.5.0, 1.4.1
>
>
> {{MetricRegistry#getReporters()}} is only used for testing purposes to 
> provide access to instantiated reporters. We should annotate this method with 
> {{@VisibleForTesting}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8081) Annotate MetricRegistry#getReporters() with @VisibleForTesting

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266699#comment-16266699
 ] 

ASF GitHub Bot commented on FLINK-8081:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5049


> Annotate MetricRegistry#getReporters() with @VisibleForTesting
> --
>
> Key: FLINK-8081
> URL: https://issues.apache.org/jira/browse/FLINK-8081
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.5.0, 1.4.1
>
>
> {{MetricRegistry#getReporters()}} is only used for testing purposes to 
> provide access to instantiated reporters. We should annotate this method with 
> {{@VisibleForTesting}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5077: [docs] fix wrong package name

2017-11-27 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5077
  
nice catch, merging.


---


[jira] [Commented] (FLINK-7530) Port existing REST handlers to support Flip-6 components

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266765#comment-16266765
 ] 

ASF GitHub Bot commented on FLINK-7530:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5081

[FLINK-7530][flip6] Migrate TaskManagerMetricsHandler to new 
RestServerEndpoint

## What is the purpose of the change

*FLIP-6 efforts: Migrating HTTP handlers*

## Brief change log
  - Migrate logic from 
`org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler 
to` new handler.
  - Add new handler to `DispatcherRestEndpoint`.

## Verifying this change

  - *Added unit tests for all new classes and modified existing classes 
except for DispatcherRestEndpoint.*
  - *Manually deployed a job locally and verified with `curl` that 
TaskManager metrics can be queried in FLIP-6 standalone mode. Note that the 
task manager ids exposed by the WebUI in FLIP-6 mode are currently broken 
(https://issues.apache.org/jira/browse/FLINK-8150). I had to obtain valid ids 
by setting a breakpoint in `ResourceManager`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7530-tm-metrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5081.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5081


commit 71ee76139e0703bea74623353c7175533582aa16
Author: gyao 
Date:   2017-11-27T12:57:48Z

[FLINK-7530][flip6] Migrate TaskManagerMetricsHandler to new 
RestServerEndpoint

Migrate logic in

org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler
to new handler, and add new handler to DispatcherRestEndpoint.




> Port existing REST handlers to support Flip-6 components
> 
>
> Key: FLINK-7530
> URL: https://issues.apache.org/jira/browse/FLINK-7530
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> This is the umbrella issue for porting the existing REST handlers to work 
> together with the new {{RestServerEndpoint}} and the {{AbstractRestHandler}}. 
> This is the requirement to make them work with the Flip-6 {{Dispatcher}} and 
> {{JobMaster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7716) Port JobManagerMetricsHandler to new REST endpoint

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266800#comment-16266800
 ] 

ASF GitHub Bot commented on FLINK-7716:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5083

[FLINK-7716][flip6] Migrate JobManagerMetricsHandler to new 
RestServerEndpoint

## What is the purpose of the change

*FLIP-6 efforts: Migrating HTTP handlers*

## Brief change log
  - Migrate logic from 
`org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler 
to` new handler.
  - Add new handler to `DispatcherRestEndpoint`.

## Verifying this change

  - *Added unit tests for all new classes and modified existing classes 
except for DispatcherRestEndpoint.*
  - *Manually deployed a job locally and verified with `curl` that 
JobManager metrics can be queried in FLIP-6 standalone mode.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7716

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5083


commit 17244d63fcc1610c83a5c020a63fe045087a2f07
Author: gyao 
Date:   2017-11-27T13:18:48Z

[FLIP-7716][flip6] Migrate JobManagerMetricsHandler to new 
RestServerEndpoint

Migrate logic in

org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler
to new handler and add new handler to DispatcherRestEndpoint.

commit 51d37ccce04b912dcda6f5ae74d8092d22b5ff9d
Author: gyao 
Date:   2017-11-27T13:22:17Z

[FLINK-7716][Javadoc] Deprecate method MetricStore#getJobManager().

There is a semantically equivalent method in MetricStore.

commit c7283194b19719270e4c0c2cc185876e710b750a
Author: gyao 
Date:   2017-11-27T13:24:42Z

[hotfix][Javadoc] Fix typo in ConversionException




> Port JobManagerMetricsHandler to new REST endpoint
> --
>
> Key: FLINK-7716
> URL: https://issues.apache.org/jira/browse/FLINK-7716
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JobManagerMetricsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266805#comment-16266805
 ] 

ASF GitHub Bot commented on FLINK-8090:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5032
  
Thanks for the suggestion @aljoscha. 

The problem is the state type is provided via a generic type parameter `S 
extends State`, which will be erased in runtime. Thus it's hard to do type 
checking in `AbstractKeyedStateBackend` unless we explicitly store and check 
the type for each state name (and that may affect the performance). The 
existing "leave alone" solution seems to be the most efficient way, but we can 
only get a `ClassCastException` with that. What do you think?


> Improve error message when registering different states under the same name.
> 
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO,
>   source.getType());
> final ListStateDescriptor secondListStateDescriptor = new 
> ListStateDescriptor(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction, Object>() {
>   private static final long serialVersionUID = 
> -805125545438296619L;
>   private transient MapState Tuple2> firstMapState;
> private transient ListState 
> secondListState;
>   @Override
>   public void open(Configuration parameters) 
> throws Exception {
>   super.open(parameters);
>   firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>   secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>   }
>   @Override
>   public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception {
>   Tuple2 v = 
> firstMapState.get(value.f0);
>   if (v == null) {
>   v = new Tuple2<>(value.f0, 0L);
>   }
>   firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>   }
>   }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>   at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to 
> org.apache.flink.api.common.state.ListState
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
>   ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The 
> error message should be something along the line of "Duplicate state name".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266871#comment-16266871
 ] 

ASF GitHub Bot commented on FLINK-8090:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5032
  
We could check based on the state descriptor.


> Improve error message when registering different states under the same name.
> 
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO,
>   source.getType());
> final ListStateDescriptor secondListStateDescriptor = new 
> ListStateDescriptor(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction, Object>() {
>   private static final long serialVersionUID = 
> -805125545438296619L;
>   private transient MapState Tuple2> firstMapState;
> private transient ListState 
> secondListState;
>   @Override
>   public void open(Configuration parameters) 
> throws Exception {
>   super.open(parameters);
>   firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>   secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>   }
>   @Override
>   public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception {
>   Tuple2 v = 
> firstMapState.get(value.f0);
>   if (v == null) {
>   v = new Tuple2<>(value.f0, 0L);
>   }
>   firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>   }
>   }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>   at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to 
> org.apache.flink.api.common.state.ListState
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
>   ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The 
> error message should be something along the line of "Duplicate state name".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8158) Rowtime window inner join emits late data

2017-11-27 Thread Hequn Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-8158:
---
Description: 
When executing the join, the join operator needs to make sure that no late data 
is emitted. Currently, this achieved by holding back watermarks. However, the 
window border is not handled correctly. For the sql bellow: 
{quote}
val sqlQuery =
  """
SELECT t2.key, t2.id, t1.id
FROM T1 as t1 join T2 as t2 ON
  t1.key = t2.key AND
  t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
t2.rt + INTERVAL '1' SECOND
""".stripMargin

val data1 = new mutable.MutableList[(String, String, Long)]
// for boundary test
data1.+=(("A", "LEFT1", 6000L))

val data2 = new mutable.MutableList[(String, String, Long)]
data2.+=(("A", "RIGHT1", 6000L))
{quote}

Join will output a watermark with timestamp 1000, but if left comes with 
another data ("A", "LEFT1", 1000L), join will output a record with timestamp 
1000 which equals previous watermark.

  was:
When executing the join, the join operator needs to make sure that no late data 
is emitted. Currently, this achieved by holding back watermarks. However, the 
window border is not handled correctly. For the sql bellow: 
{quote}
val sqlQuery =
  """
|SELECT t2.key, t2.id, t1.id
|FROM T1 as t1 join T2 as t2 ON
|  t1.key = t2.key AND
|  t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
|t2.rt + INTERVAL '1' SECOND
|""".stripMargin

val data1 = new mutable.MutableList[(String, String, Long)]
// for boundary test
data1.+=(("A", "LEFT1", 6000L))

val data2 = new mutable.MutableList[(String, String, Long)]
data2.+=(("A", "RIGHT1", 6000L))
{quote}

Join will output a watermark with timestamp 1000, but if left comes with 
another data ("A", "LEFT1", 1000L), join will output a record with timestamp 
1000 which equals previous watermark.


> Rowtime window inner join emits late data
> -
>
> Key: FLINK-8158
> URL: https://issues.apache.org/jira/browse/FLINK-8158
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>
> When executing the join, the join operator needs to make sure that no late 
> data is emitted. Currently, this achieved by holding back watermarks. 
> However, the window border is not handled correctly. For the sql bellow: 
> {quote}
> val sqlQuery =
>   """
> SELECT t2.key, t2.id, t1.id
> FROM T1 as t1 join T2 as t2 ON
>   t1.key = t2.key AND
>   t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
> t2.rt + INTERVAL '1' SECOND
> """.stripMargin
> val data1 = new mutable.MutableList[(String, String, Long)]
> // for boundary test
> data1.+=(("A", "LEFT1", 6000L))
> val data2 = new mutable.MutableList[(String, String, Long)]
> data2.+=(("A", "RIGHT1", 6000L))
> {quote}
> Join will output a watermark with timestamp 1000, but if left comes with 
> another data ("A", "LEFT1", 1000L), join will output a record with timestamp 
> 1000 which equals previous watermark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8022) Kafka at-least-once tests fail occasionally

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266707#comment-16266707
 ] 

ASF GitHub Bot commented on FLINK-8022:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5078
  
Thanks!


> Kafka at-least-once tests fail occasionally
> ---
>
> Key: FLINK-8022
> URL: https://issues.apache.org/jira/browse/FLINK-8022
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}}
>  seems to sporadically fail with missing data, like this execution:
> {code}
> 
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  is running.
> 
> 17:54:30,195 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Creating topic oneToOneTopicRegularSink
> 17:54:30,196 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - In 
> getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436
> 17:54:30,204 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Starting ZkClient event thread.
> 17:54:30,204 INFO  org.I0Itec.zkclient.ZkClient   
>- Waiting for keeper state SyncConnected
> 17:54:30,240 INFO  org.I0Itec.zkclient.ZkClient   
>- zookeeper state changed (SyncConnected)
> 17:54:30,261 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Terminate ZkClient event thread.
> 17:54:30,265 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - Topic 
> oneToOneTopicRegularSink create request is successfully posted
> 17:54:30,366 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - 
> Validating if the topic oneToOneTopicRegularSink has been created or not
> 17:54:30,373 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - In 
> getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436
> 17:54:30,374 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Starting ZkClient event thread.
> 17:54:30,374 INFO  org.I0Itec.zkclient.ZkClient   
>- Waiting for keeper state SyncConnected
> 17:54:30,404 INFO  org.I0Itec.zkclient.ZkClient   
>- zookeeper state changed (SyncConnected)
> 17:54:30,420 INFO  
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl  - topic 
> oneToOneTopicRegularSink has been created successfully
> 17:54:30,421 INFO  org.I0Itec.zkclient.ZkEventThread  
>- Terminate ZkClient event thread.
> 17:54:31,099 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - 
> Starting FlinkKafkaProducer (1/1) to produce into default topic 
> oneToOneTopicRegularSink
> 17:55:05,229 ERROR 
> org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase  - 
> 
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase)
>  failed with:
> java.lang.AssertionError: Expected to contain all of: <[0, 1, 2, 3, 4, 5, 6, 
> 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 
> 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 
> 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 
> 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 
> 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 
> 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 
> 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 
> 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 
> 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 
> 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 
> 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 
> 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 
> 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 
> 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 
> 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 

[jira] [Commented] (FLINK-8122) Name all table sinks and sources

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266731#comment-16266731
 ] 

ASF GitHub Bot commented on FLINK-8122:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5068#discussion_r153182297
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -60,4 +60,14 @@ trait TableSource[T] {
 */
   def explainSource(): String = ""
 
+  /**
+* Gets the name which is used by the visualization and logging during 
runtime.
+*
+* @return Name of the [[TableSource]].
+*/
+  def getRuntimeName(): String = {
--- End diff --

Shall I reserve the existing `explainSource()`  implementations or use a 
unified one?


> Name all table sinks and sources
> 
>
> Key: FLINK-8122
> URL: https://issues.apache.org/jira/browse/FLINK-8122
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Not all table sink and sources have proper names. Therefore, they are 
> displayed as "Unnamed" in the logs and Web UI (e.g. CsvTableSink). We should 
> add names for all built-in connectors. Having information about the table 
> sink name (via {{INSERT INTO}}) would be even better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5080: [FLINK-8159] [cep] Add rich support for SelectWrap...

2017-11-27 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5080

[FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper

## What is the purpose of the change

*This pull request add the rich support for SelectWrapper and 
FlatSelectWrapper. It the wrapped functions are rich function, it should 
process correctly.*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): ( no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink SelectFunction

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5080.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5080


commit 4c3ccb008b38d44189578975b5eee9208561567b
Author: Dian Fu 
Date:   2017-11-27T12:50:30Z

[FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper




---


[GitHub] flink pull request #5077: [docs] fix wrong package name

2017-11-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5077


---


[jira] [Commented] (FLINK-8143) Port SubtaskMetricsHandler to new REST endpoint

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266771#comment-16266771
 ] 

ASF GitHub Bot commented on FLINK-8143:
---

Github user GJL closed the pull request at:

https://github.com/apache/flink/pull/5079


> Port SubtaskMetricsHandler to new REST endpoint
> ---
>
> Key: FLINK-8143
> URL: https://issues.apache.org/jira/browse/FLINK-8143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
>
> Migrate logic in 
> {{org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler}}
>  to a handler that extends 
> {{org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler}}. 
> Register new handler to {{DispatcherRestEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8143) Port SubtaskMetricsHandler to new REST endpoint

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266772#comment-16266772
 ] 

ASF GitHub Bot commented on FLINK-8143:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5082

[FLINK-8143][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint

## What is the purpose of the change

*FLIP-6 efforts: Migrating HTTP handlers*

## Brief change log
  - Migrate logic from 
`org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to` 
new handler.
  - Add new handler to `DispatcherRestEndpoint`.

## Verifying this change

  - *Added unit tests for all new classes and modified existing classes 
except for DispatcherRestEndpoint.*
  - *Manually deployed a job locally and verified with `curl` that 
SubtaskMetrics can be queried in FLIP-6 standalone mode.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8143

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5082.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5082


commit d32d978e1826df6fab8f5b4a27d47b4367d54ece
Author: gyao 
Date:   2017-11-27T12:30:18Z

[FLINK-8150][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint

Migrate logic from
org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler 
to
new handler. Add new handler to DispatcherRestEndpoint.

commit 96c9f671d6320d5d1d014d5df27f4a3a07417993
Author: gyao 
Date:   2017-11-27T12:32:52Z

[hotfix][Javadoc] Remove wrong Javadoc from SubtaskMetricsHandler

commit c369d733fecc71a7581b5a6cbf196b35ecdfed12
Author: gyao 
Date:   2017-11-27T12:34:02Z

[hotfix][flip6] Add unit tests JobVertexMetricsHeadersTest




> Port SubtaskMetricsHandler to new REST endpoint
> ---
>
> Key: FLINK-8143
> URL: https://issues.apache.org/jira/browse/FLINK-8143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
>
> Migrate logic in 
> {{org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler}}
>  to a handler that extends 
> {{org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler}}. 
> Register new handler to {{DispatcherRestEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5068: [FLINK-8122] [table] Name all built-in table sinks...

2017-11-27 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5068#discussion_r153182297
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -60,4 +60,14 @@ trait TableSource[T] {
 */
   def explainSource(): String = ""
 
+  /**
+* Gets the name which is used by the visualization and logging during 
runtime.
+*
+* @return Name of the [[TableSource]].
+*/
+  def getRuntimeName(): String = {
--- End diff --

Shall I reserve the existing `explainSource()`  implementations or use a 
unified one?


---


[GitHub] flink issue #5079: [FLINK-8143][flip6] Migrate SubtaskMetricsHandler to new ...

2017-11-27 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5079
  
My branch name is wrong. The commit messages are right. Let me know if this 
is a problem.


---


[jira] [Commented] (FLINK-8143) Port SubtaskMetricsHandler to new REST endpoint

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266768#comment-16266768
 ] 

ASF GitHub Bot commented on FLINK-8143:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5079
  
My branch name is wrong. The commit messages are right. Let me know if this 
is a problem.


> Port SubtaskMetricsHandler to new REST endpoint
> ---
>
> Key: FLINK-8143
> URL: https://issues.apache.org/jira/browse/FLINK-8143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
>
> Migrate logic in 
> {{org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler}}
>  to a handler that extends 
> {{org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler}}. 
> Register new handler to {{DispatcherRestEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5084: [FLINK-7694][flip6] Migrate JobMetricsHandler to n...

2017-11-27 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5084

[FLINK-7694][flip6] Migrate JobMetricsHandler to new RestServerEndpoint

## What is the purpose of the change

*FLIP-6 efforts: Migrating HTTP handlers*

## Brief change log
  - Migrate logic from 
`org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler to` new 
handler.
  - Add new handler to `DispatcherRestEndpoint`.

## Verifying this change

  - *Added unit tests for all new classes and modified existing classes 
except for DispatcherRestEndpoint.*
  - *Manually deployed a job locally and verified with `curl` that Job 
metrics can be queried in FLIP-6 standalone mode.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7694

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5084.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5084


commit 7a3bdc5a8c2c53a9b4219d1fab7faf5490d1ef92
Author: gyao 
Date:   2017-11-27T13:34:03Z

[FLINK-7694][flip6] Migrate JobMetricsHandler to new RestServerEndpoint

Migrate logic in
org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetrisHandler to new
handler, and add new handler to DispatcherRestEndpoint.




---


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266819#comment-16266819
 ] 

ASF GitHub Bot commented on FLINK-7694:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5084

[FLINK-7694][flip6] Migrate JobMetricsHandler to new RestServerEndpoint

## What is the purpose of the change

*FLIP-6 efforts: Migrating HTTP handlers*

## Brief change log
  - Migrate logic from 
`org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler to` new 
handler.
  - Add new handler to `DispatcherRestEndpoint`.

## Verifying this change

  - *Added unit tests for all new classes and modified existing classes 
except for DispatcherRestEndpoint.*
  - *Manually deployed a job locally and verified with `curl` that Job 
metrics can be queried in FLIP-6 standalone mode.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7694

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5084.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5084


commit 7a3bdc5a8c2c53a9b4219d1fab7faf5490d1ef92
Author: gyao 
Date:   2017-11-27T13:34:03Z

[FLINK-7694][flip6] Migrate JobMetricsHandler to new RestServerEndpoint

Migrate logic in
org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetrisHandler to new
handler, and add new handler to DispatcherRestEndpoint.




> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Gary Yao
>  Labels: flip6
> Fix For: 1.5.0
>
>
> Port 
> {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}} to 
> new handler that works with {{RestServerEndpoint}}. Add new handler to 
> {{DispatcherRestEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5078: [FLINK-8022][kafka] Bump at-least-once timeout in ...

2017-11-27 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5078

[FLINK-8022][kafka] Bump at-least-once timeout in tests

Increasing timeout for reading the records from 30s to 60s seems to solve 
the issue
for failing at-least-one tests.

This is a minor fix in tests that intends to increase tests stability.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8022

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5078.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5078


commit 31a0f9ecb6f39210e2b7dacee9bcf90f44ceee1f
Author: Piotr Nowojski 
Date:   2017-11-23T11:53:37Z

[FLINK-8022][kafka] Bump at-least-once timeout in tests

Increaseing timeout for reading the records from 30s to 60s seems to solve 
the issue
for failing at-least-one tests.




---


[GitHub] flink issue #5078: [FLINK-8022][kafka] Bump at-least-once timeout in tests

2017-11-27 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5078
  
Thanks!


---


[jira] [Commented] (FLINK-8159) Add rich support for SelectWrapper and FlatSelectWrapper

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266750#comment-16266750
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5080

[FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper

## What is the purpose of the change

*This pull request add the rich support for SelectWrapper and 
FlatSelectWrapper. It the wrapped functions are rich function, it should 
process correctly.*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): ( no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink SelectFunction

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5080.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5080


commit 4c3ccb008b38d44189578975b5eee9208561567b
Author: Dian Fu 
Date:   2017-11-27T12:50:30Z

[FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper




> Add rich support for SelectWrapper and FlatSelectWrapper
> 
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8160) Extend OperatorHarness to expose metrics

2017-11-27 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-8160:
---

Assignee: (was: Chesnay Schepler)

> Extend OperatorHarness to expose metrics
> 
>
> Key: FLINK-8160
> URL: https://issues.apache.org/jira/browse/FLINK-8160
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Streaming
>Reporter: Chesnay Schepler
> Fix For: 1.5.0
>
>
> To better test interactions between operators and metrics the harness should 
> expose the metrics registered by the operator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5083: [FLINK-7716][flip6] Migrate JobManagerMetricsHandl...

2017-11-27 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5083

[FLINK-7716][flip6] Migrate JobManagerMetricsHandler to new 
RestServerEndpoint

## What is the purpose of the change

*FLIP-6 efforts: Migrating HTTP handlers*

## Brief change log
  - Migrate logic from 
`org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler 
to` new handler.
  - Add new handler to `DispatcherRestEndpoint`.

## Verifying this change

  - *Added unit tests for all new classes and modified existing classes 
except for DispatcherRestEndpoint.*
  - *Manually deployed a job locally and verified with `curl` that 
JobManager metrics can be queried in FLIP-6 standalone mode.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7716

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5083


commit 17244d63fcc1610c83a5c020a63fe045087a2f07
Author: gyao 
Date:   2017-11-27T13:18:48Z

[FLIP-7716][flip6] Migrate JobManagerMetricsHandler to new 
RestServerEndpoint

Migrate logic in

org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler
to new handler and add new handler to DispatcherRestEndpoint.

commit 51d37ccce04b912dcda6f5ae74d8092d22b5ff9d
Author: gyao 
Date:   2017-11-27T13:22:17Z

[FLINK-7716][Javadoc] Deprecate method MetricStore#getJobManager().

There is a semantically equivalent method in MetricStore.

commit c7283194b19719270e4c0c2cc185876e710b750a
Author: gyao 
Date:   2017-11-27T13:24:42Z

[hotfix][Javadoc] Fix typo in ConversionException




---


[GitHub] flink issue #4581: [FLINK-7499][io] fix double buffer release in SpillableSu...

2017-11-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4581
  
Still failing with some checkstyle violations in `ResultPartition.java`


---


[jira] [Commented] (FLINK-8154) JobSubmissionClientActor submited job,but there is no connection to a JobManager

2017-11-27 Thread Gregory Melekh (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266949#comment-16266949
 ] 

Gregory Melekh commented on FLINK-8154:
---

Ohhh...You are right...
I should create headless service to redirect messages to Jobmanager Pod.

Thanks

> JobSubmissionClientActor  submited job,but there is no connection to a 
> JobManager
> -
>
> Key: FLINK-8154
> URL: https://issues.apache.org/jira/browse/FLINK-8154
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.2
> Environment: Kubernetes 1.8.3, Platform "Linux/amd64"
>Reporter: Gregory Melekh
>Priority: Blocker
>
> There is JobManager log file bellow.
> 2017-11-26 08:17:13,435 INFO  org.apache.flink.client.CliFrontend 
>   - 
> 
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  Starting Command Line Client (Version: 1.3.2, Rev:0399bee, 
> Date:03.08.2017 @ 10:23:11 UTC)
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  Current user: root
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 
> 1.8/25.131-b11
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  Maximum heap size: 6252 MiBytes
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre
> 2017-11-26 08:17:13,439 INFO  org.apache.flink.client.CliFrontend 
>   -  Hadoop version: 2.7.2
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   -  JVM Options:
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - 
> -Dlog.file=/opt/flink/log/flink--client-act-monitor-flink-jobmanager-66cd4bdb5c-8kxbh.log
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - -Dlog4j.configuration=file:/etc/flink/log4j-cli.properties
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - -Dlogback.configurationFile=file:/etc/flink/logback.xml
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   -  Program Arguments:
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - run
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - -c
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - monitoring.flow.AccumulateAll
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - /tmp/monitoring-0.0.1-SNAPSHOT.jar
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   -  Classpath: 
> /opt/flink/lib/flink-python_2.11-1.3.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.3.2.jar:::
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - 
> 
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - Using configuration directory /etc/flink
> 2017-11-26 08:17:13,441 INFO  org.apache.flink.client.CliFrontend 
>   - Trying to load configuration file
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: blob.server.port, 6124
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.address, act-monitor-flink-jobmanager
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.web.log.path, 
> /etc/flink/log/act-monitor-flink-jobmanager.log
> 2017-11-26 08:17:13,444 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration 

[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery

2017-11-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153241166
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle;
+import org.apache.flink.runtime.checkpoint.CheckpointCache;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * {@link CachedCheckpointStreamFactory} is used to build an output stream 
that writes data to both remote end (e.g:DFS) and local end.
+ * Local data is managed by {@link CheckpointCache}. It simply wraps 
{@link CheckpointCache} and {@link CheckpointStreamFactory} and
+ * create a hybrid output stream by {@link CheckpointCache} and {@link 
CheckpointStreamFactory}, this hybrid output stream will write
+ * to both remote end and local end.
+ */
+public class CachedCheckpointStreamFactory implements 
CheckpointStreamFactory {
+
+   private static Logger LOG = 
LoggerFactory.getLogger(CachedCheckpointStreamFactory.class);
+
+   private final CheckpointCache cache;
+   private final CheckpointStreamFactory remoteFactory;
+
+   public CachedCheckpointStreamFactory(CheckpointCache cache, 
CheckpointStreamFactory factory) {
+   this.cache = cache;
+   this.remoteFactory = Preconditions.checkNotNull(factory, 
"Remote stream factory is null.");
+   }
+
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp, 
StateHandleID handleID) throws Exception {
+   return createCheckpointStateOutputStream(checkpointID, 
timestamp, handleID, false);
+   }
+
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp, 
StateHandleID handleID, boolean placeholder) throws Exception {
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("create cache output stream: cpkID:{} 
placeHolder:{}", checkpointID, placeholder);
+   }
+   CachedOutputStream cachedOut = null;
+   if (cache != null) {
+   cachedOut = cache.createOutputStream(checkpointID, 
handleID, placeholder);
+   }
+   CheckpointStateOutputStream remoteOut = null;
+   if (!placeholder) {
+   remoteOut = 
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+   }
+   CachedCheckpointStateOutputStream output = new 
CachedCheckpointStateOutputStream(cachedOut, remoteOut);
+   return output;
+   }
+
+   @Override
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+   LOG.warn("create output stream which is not cacheable.");
+   return 
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+   }
+
+   @Override
+   public void close() throws Exception {
+   remoteFactory.close();
+   }
+
+   /**
+* A hybrid checkpoint output stream which write data to both remote 
end and local end,
+* writing data locally failed won't stop writing to remote. This 
hybrid output stream
+* will return a {@link CachedStreamStateHandle} in 
closeAndGetHandle(), it can be used for read data locally.
+*/
+   public static class CachedCheckpointStateOutputStream extends 
CheckpointStateOutputStream {
+
+   private CachedOutputStream cacheOut = null;
+   private CheckpointStateOutputStream remoteOut = null;
+
+

[jira] [Commented] (FLINK-8161) Flakey YARNSessionCapacitySchedulerITCase on Travis

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267023#comment-16267023
 ] 

ASF GitHub Bot commented on FLINK-8161:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5085

[FLINK-8161] [tests] Harden YARNSessionCapacitySchedulerITCase

## What is the purpose of the change

Add "Remote connection to [null] failed with 
java.nio.channels.NotYetConnectedException"
to the list of whitelisted log statements in YarnTestBase. This logging 
statement
seems to appear since we moved from Flakka to Akka 2.4.0.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
fixYarnSessionCapacitySchedulerITCase

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5085.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5085


commit eede21bc1ee335a913d1a49d82d73ce1d0cfe3aa
Author: Till Rohrmann 
Date:   2017-11-27T16:31:28Z

[FLINK-8161] [tests] Harden YARNSessionCapacitySchedulerITCase

Add "Remote connection to [null] failed with 
java.nio.channels.NotYetConnectedException"
to the list of whitelisted log statements in YarnTestBase. This logging 
statement
seems to appear since we moved from Flakka to Akka 2.4.0.




> Flakey YARNSessionCapacitySchedulerITCase on Travis
> ---
>
> Key: FLINK-8161
> URL: https://issues.apache.org/jira/browse/FLINK-8161
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> The {{YARNSessionCapacitySchedulerITCase}} spuriously fails on Travis because 
> it now contains {{2017-11-25 22:49:49,204 WARN  
> akka.remote.transport.netty.NettyTransport- Remote 
> connection to [null] failed with java.nio.channels.NotYetConnectedException}} 
> from time to time in the logs. I suspect that this is due to switching from 
> Flakka to Akka 2.4.0. In order to solve this problem I propose to add this 
> log statement to the whitelisted log statements.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5085: [FLINK-8161] [tests] Harden YARNSessionCapacitySch...

2017-11-27 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5085

[FLINK-8161] [tests] Harden YARNSessionCapacitySchedulerITCase

## What is the purpose of the change

Add "Remote connection to [null] failed with 
java.nio.channels.NotYetConnectedException"
to the list of whitelisted log statements in YarnTestBase. This logging 
statement
seems to appear since we moved from Flakka to Akka 2.4.0.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
fixYarnSessionCapacitySchedulerITCase

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5085.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5085


commit eede21bc1ee335a913d1a49d82d73ce1d0cfe3aa
Author: Till Rohrmann 
Date:   2017-11-27T16:31:28Z

[FLINK-8161] [tests] Harden YARNSessionCapacitySchedulerITCase

Add "Remote connection to [null] failed with 
java.nio.channels.NotYetConnectedException"
to the list of whitelisted log statements in YarnTestBase. This logging 
statement
seems to appear since we moved from Flakka to Akka 2.4.0.




---


[GitHub] flink pull request #5089: [FLINK-8088] Associate logical slots with the slot...

2017-11-27 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5089

[FLINK-8088] Associate logical slots with the slot request id

## What is the purpose of the change

Before, logical slots like the `SimpleSlot` and `SharedSlot` where 
associated to the
actually allocated slot via the `AllocationID`. This, however, was 
sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical 
slots).
Therefore, we should bind the logical slots to the right id with the right 
lifecycle
which is the `SlotRequestID`.

This PR is based on #5088.

## Brief change log

- Introduce `SlotRequestID`
- Associate logical slot requests with `SlotRequestID` which is valid over 
the lifetime of a logical slot

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

CC: @GJL 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink slotAssociation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5089.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5089


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f
Author: Till Rohrmann 
Date:   2017-11-24T17:03:49Z

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.

commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942
Author: Till Rohrmann 
Date:   2017-11-24T17:06:10Z

[FLINK-8088] Associate logical slots with the slot request id

Before logical slots like the SimpleSlot and SharedSlot where associated to 
the
actually allocated slot via the AllocationID. This, however, was 
sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical 
slots).
Therefore, we should bind the logical slots to the right id with the right 
lifecycle
which is the slot request id.




---


[jira] [Commented] (FLINK-8088) Bind logical slots to their request id instead of the slot allocation id

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267094#comment-16267094
 ] 

ASF GitHub Bot commented on FLINK-8088:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5089

[FLINK-8088] Associate logical slots with the slot request id

## What is the purpose of the change

Before, logical slots like the `SimpleSlot` and `SharedSlot` where 
associated to the
actually allocated slot via the `AllocationID`. This, however, was 
sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical 
slots).
Therefore, we should bind the logical slots to the right id with the right 
lifecycle
which is the `SlotRequestID`.

This PR is based on #5088.

## Brief change log

- Introduce `SlotRequestID`
- Associate logical slot requests with `SlotRequestID` which is valid over 
the lifetime of a logical slot

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

CC: @GJL 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink slotAssociation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5089.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5089


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f
Author: Till Rohrmann 
Date:   2017-11-24T17:03:49Z

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.

commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942
Author: Till Rohrmann 
Date:   2017-11-24T17:06:10Z

[FLINK-8088] Associate logical slots with the slot request id

Before logical slots like the SimpleSlot and SharedSlot where associated to 
the
actually allocated slot via the AllocationID. This, however, was 
sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical 
slots).
Therefore, we should bind the logical slots to the right id with the right 
lifecycle
which is the slot request id.




> Bind logical slots to their request id instead of the slot allocation id
> 
>
> Key: FLINK-8088
> URL: https://issues.apache.org/jira/browse/FLINK-8088
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> Since allocated slots can be reused to fulfil multiple slot requests, we 
> should bind the resulting logical slots to their slot request id instead of 
> the allocation id of the underlying allocated slot.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266992#comment-16266992
 ] 

ASF GitHub Bot commented on FLINK-7873:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153241166
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java
 ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle;
+import org.apache.flink.runtime.checkpoint.CheckpointCache;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * {@link CachedCheckpointStreamFactory} is used to build an output stream 
that writes data to both remote end (e.g:DFS) and local end.
+ * Local data is managed by {@link CheckpointCache}. It simply wraps 
{@link CheckpointCache} and {@link CheckpointStreamFactory} and
+ * create a hybrid output stream by {@link CheckpointCache} and {@link 
CheckpointStreamFactory}, this hybrid output stream will write
+ * to both remote end and local end.
+ */
+public class CachedCheckpointStreamFactory implements 
CheckpointStreamFactory {
+
+   private static Logger LOG = 
LoggerFactory.getLogger(CachedCheckpointStreamFactory.class);
+
+   private final CheckpointCache cache;
+   private final CheckpointStreamFactory remoteFactory;
+
+   public CachedCheckpointStreamFactory(CheckpointCache cache, 
CheckpointStreamFactory factory) {
+   this.cache = cache;
+   this.remoteFactory = Preconditions.checkNotNull(factory, 
"Remote stream factory is null.");
+   }
+
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp, 
StateHandleID handleID) throws Exception {
+   return createCheckpointStateOutputStream(checkpointID, 
timestamp, handleID, false);
+   }
+
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp, 
StateHandleID handleID, boolean placeholder) throws Exception {
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("create cache output stream: cpkID:{} 
placeHolder:{}", checkpointID, placeholder);
+   }
+   CachedOutputStream cachedOut = null;
+   if (cache != null) {
+   cachedOut = cache.createOutputStream(checkpointID, 
handleID, placeholder);
+   }
+   CheckpointStateOutputStream remoteOut = null;
+   if (!placeholder) {
+   remoteOut = 
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+   }
+   CachedCheckpointStateOutputStream output = new 
CachedCheckpointStateOutputStream(cachedOut, remoteOut);
+   return output;
+   }
+
+   @Override
+   public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+   LOG.warn("create output stream which is not cacheable.");
+   return 
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+   }
+
+   @Override
+   public void close() throws Exception {
+   remoteFactory.close();
+   }
+
+   /**
+* A hybrid checkpoint output stream which write data to both remote 
end and local end,
+* writing data locally failed won't stop writing to remote. This 
hybrid output stream
+* will return a {@link CachedStreamStateHandle} in 
closeAndGetHandle(), it can be used for read data locally.
+*/
+   

[GitHub] flink pull request #5086: [FLINK-8078] Introduce LogicalSlot interface

2017-11-27 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5086

[FLINK-8078] Introduce LogicalSlot interface

## What is the purpose of the change

The LogicalSlot interface decouples the task deployment from the actual 
slot implementation which at the moment is `SimpleSlot`.  This is a helpful 
step to introduce a different slot implementation for Flip-6.

## Brief change log

- Introduce `LogicalSlot`
- Replace `SimpleSlot` usage by `LogicalSlot`
- Let `SimpleSlot` implement the `LogicalSlot` interface

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink generalizeSlot

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5086.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5086


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.




---


[jira] [Commented] (FLINK-8078) Decouple Execution from actual slot implementation

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267070#comment-16267070
 ] 

ASF GitHub Bot commented on FLINK-8078:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5086

[FLINK-8078] Introduce LogicalSlot interface

## What is the purpose of the change

The LogicalSlot interface decouples the task deployment from the actual 
slot implementation which at the moment is `SimpleSlot`.  This is a helpful 
step to introduce a different slot implementation for Flip-6.

## Brief change log

- Introduce `LogicalSlot`
- Replace `SimpleSlot` usage by `LogicalSlot`
- Let `SimpleSlot` implement the `LogicalSlot` interface

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink generalizeSlot

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5086.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5086


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.




> Decouple Execution from actual slot implementation
> --
>
> Key: FLINK-8078
> URL: https://issues.apache.org/jira/browse/FLINK-8078
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to plug in a different slot implementation, we should introduce a 
> slot interface which abstracts away the implementation details of 
> {{SimpleSlot}} wrt {{Execution}}. The reason this is necessary is to provide 
> a simpler slot implementation for Flip-6 since all allocation/release logic 
> will go through the {{SlotPool}}. Thus, we no longer need the concurrent 
> structure of {{Slot}}, {{SharedSlot}}, {{SimpleSlot}} and 
> {{SlotSharingGroupAssignment}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267112#comment-16267112
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r153261175
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -152,6 +170,26 @@ public void channelRead(ChannelHandlerContext ctx, 
Object msg) throws Exception
}
}
 
+   @Override
+   public void userEventTriggered(ChannelHandlerContext ctx, Object msg) 
throws Exception {
+   if (msg instanceof RemoteInputChannel) {
+   boolean triggerWrite = 
inputChannelsWithCredit.isEmpty();
--- End diff --

how about some small comment as in `PartitionRequestQueue`? Something like
```
// Queue an input channel for available credits 
announcement.
// If the queue is empty, we try to trigger the actual 
write.
// Otherwise this will be handled by the
// writeAndFlushNextMessageIfPossible calls.
```


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267111#comment-16267111
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r153266186
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -539,4 +542,60 @@ static CloseRequest 
readFrom(@SuppressWarnings("unused") ByteBuf buffer) throws
return new CloseRequest();
}
}
+
+   static class AddCredit extends NettyMessage {
--- End diff --

Please add a comment ("incremental credit announcement from the client to 
the server"?).


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267113#comment-16267113
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r153269318
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -283,10 +283,13 @@ public String toString() {
// 

 
/**
-* Enqueue this input channel in the pipeline for sending unannounced 
credits to producer.
+* Enqueue this input channel in the pipeline for notifying the 
producer of unannounced credit.
 */
void notifyCreditAvailable() {
-   //TODO in next PR
+   // We should skip the notification if this channel is already 
released.
+   if (!isReleased.get() && partitionRequestClient != null) {
--- End diff --

shouldn't we
```
checkState(partitionRequestClient != null, "Tried to send 
credit announcement to producer before requesting a queue.");`
```
here as well? At the moment I don't see a valid usecase for `== null` and 
only a potential problem with the notification not being tried again.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267109#comment-16267109
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r153258008
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -88,6 +98,15 @@ void cancelRequestFor(InputChannelID inputChannelId) {
}
}
 
+   void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
--- End diff --

Can you please add a comment under which circumstances not to call this, 
i.e. we must make sure, `ctx` is assigned yet (so after the channel has been 
activated somehow). I checked the uses of this method and those seem to be 
safe, i.e. in `RemoteInputChannel`s `#notifyBufferAvailable()`, 
`#onSenderBacklog()`, and `#recycle()`. All of these should only happen after 
some interaction with the channel.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267110#comment-16267110
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r153263139
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -274,4 +313,49 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
--- End diff --

Please add some javadoc with a hint how all `inputChannelsWithCredit` will 
be handled, i.e. one is written immediately, following ones after successful 
writes.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8154) JobSubmissionClientActor submited job,but there is no connection to a JobManager

2017-11-27 Thread Gregory Melekh (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gregory Melekh closed FLINK-8154.
-
Resolution: Not A Problem

> JobSubmissionClientActor  submited job,but there is no connection to a 
> JobManager
> -
>
> Key: FLINK-8154
> URL: https://issues.apache.org/jira/browse/FLINK-8154
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.2
> Environment: Kubernetes 1.8.3, Platform "Linux/amd64"
>Reporter: Gregory Melekh
>Priority: Blocker
>
> There is JobManager log file bellow.
> 2017-11-26 08:17:13,435 INFO  org.apache.flink.client.CliFrontend 
>   - 
> 
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  Starting Command Line Client (Version: 1.3.2, Rev:0399bee, 
> Date:03.08.2017 @ 10:23:11 UTC)
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  Current user: root
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 
> 1.8/25.131-b11
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  Maximum heap size: 6252 MiBytes
> 2017-11-26 08:17:13,437 INFO  org.apache.flink.client.CliFrontend 
>   -  JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre
> 2017-11-26 08:17:13,439 INFO  org.apache.flink.client.CliFrontend 
>   -  Hadoop version: 2.7.2
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   -  JVM Options:
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - 
> -Dlog.file=/opt/flink/log/flink--client-act-monitor-flink-jobmanager-66cd4bdb5c-8kxbh.log
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - -Dlog4j.configuration=file:/etc/flink/log4j-cli.properties
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - -Dlogback.configurationFile=file:/etc/flink/logback.xml
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   -  Program Arguments:
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - run
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - -c
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - monitoring.flow.AccumulateAll
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - /tmp/monitoring-0.0.1-SNAPSHOT.jar
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   -  Classpath: 
> /opt/flink/lib/flink-python_2.11-1.3.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.3.2.jar:::
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - 
> 
> 2017-11-26 08:17:13,440 INFO  org.apache.flink.client.CliFrontend 
>   - Using configuration directory /etc/flink
> 2017-11-26 08:17:13,441 INFO  org.apache.flink.client.CliFrontend 
>   - Trying to load configuration file
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: blob.server.port, 6124
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.address, act-monitor-flink-jobmanager
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2017-11-26 08:17:13,443 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.web.log.path, 
> /etc/flink/log/act-monitor-flink-jobmanager.log
> 2017-11-26 08:17:13,444 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2017-11-26 08:17:13,444 INFO  
> org.apache.flink.configuration.GlobalConfiguration

[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery

2017-11-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153234962
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -190,6 +199,11 @@ public static TaskManagerServices fromConfiguration(
 
final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
 
+   final CheckpointCacheManager checkpointCacheManager = new 
CheckpointCacheManager(
+   new ScheduledThreadPoolExecutor(1),
+   Executors.directExecutor(),
+   taskManagerServicesConfiguration.getTmpDirPaths()[0]);
--- End diff --

I find this problematic, because it does not consider all the configured 
tmp directories. While most user probably have only a single tmp directory 
configured, this can be problematic if somebody makes use of multiple 
directories (e.g. to utilize multiple smaller disks). We should also be 
sensitive about this case.


---


[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266965#comment-16266965
 ] 

ASF GitHub Bot commented on FLINK-7873:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153234962
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -190,6 +199,11 @@ public static TaskManagerServices fromConfiguration(
 
final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
 
+   final CheckpointCacheManager checkpointCacheManager = new 
CheckpointCacheManager(
+   new ScheduledThreadPoolExecutor(1),
+   Executors.directExecutor(),
+   taskManagerServicesConfiguration.getTmpDirPaths()[0]);
--- End diff --

I find this problematic, because it does not consider all the configured 
tmp directories. While most user probably have only a single tmp directory 
configured, this can be problematic if somebody makes use of multiple 
directories (e.g. to utilize multiple smaller disks). We should also be 
sensitive about this case.


> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> -
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>
> Why i introduce this:
> Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwith, and obtain a faster 
> recover.
> Solution:
> TaskManager do the cache job and manage the cached data itself. It simple 
> use a TTL-like method to manage cache entry's dispose, we dispose a entry if 
> it wasn't be touched for a X time, once we touch a entry we reset the TTL for 
> it. In this way, all jobs is done by TaskManager, it transparent to 
> JobManager. The only problem is that we may dispose a entry that maybe 
> useful, in this case, we have to read from remote data finally, but users can 
> avoid this by set a proper TTL value according to checkpoint interval and 
> other things.
> Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery

2017-11-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153235421
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 ---
@@ -510,6 +512,13 @@ private static void serializeStreamStateHandle(
byte[] internalData = byteStreamStateHandle.getData();
dos.writeInt(internalData.length);
dos.write(byteStreamStateHandle.getData());
+   } else if (stateHandle instanceof CachedStreamStateHandle) {
--- End diff --

This means we are actually introducing significant new code to the job 
manager, that even impacts the serialization format. I think this should not 
strictly be required if we map local state to checkpoint ids.


---


[jira] [Commented] (FLINK-8087) Decouple Slot from SlotPool

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267088#comment-16267088
 ] 

ASF GitHub Bot commented on FLINK-8087:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5088

[FLINK-8087] Decouple Slot from AllocatedSlot

## What is the purpose of the change

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.

This PR is based on #5087. 

## Brief change log

- Introduce `SlotContext` as simple abstraction for slot related information
- Remove dependency of `Slot` on `AllocatedSlot` which is now only used 
internally by the `SlotPool`.
- Introduce `SimpleSlotContext` which implements `SlotContext` and acts as 
the slot context for the `SimpleSlot`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

CC: @GJL 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink decoupleSlotFromSlotPool

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5088.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5088


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f
Author: Till Rohrmann 
Date:   2017-11-24T17:03:49Z

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.




> Decouple Slot from SlotPool
> ---
>
> Key: FLINK-8087
> URL: https://issues.apache.org/jira/browse/FLINK-8087
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to let the {{SlotPool}} return a a different {{LogicalSlot}} 
> implementation than {{SimpleSlot}} we should not store the {{Slot}} inside of 
> the {{SlotPool}}. Moreover, we should introduce a abstraction for the 
> {{AllocatedSlot}} which contains the information required by the 
> {{SimpleSlot}}. That way we decouple the {{SimpleSlot}} from the 
> {{AllocatedSlot}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5088: [FLINK-8087] Decouple Slot from AllocatedSlot

2017-11-27 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5088

[FLINK-8087] Decouple Slot from AllocatedSlot

## What is the purpose of the change

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.

This PR is based on #5087. 

## Brief change log

- Introduce `SlotContext` as simple abstraction for slot related information
- Remove dependency of `Slot` on `AllocatedSlot` which is now only used 
internally by the `SlotPool`.
- Introduce `SimpleSlotContext` which implements `SlotContext` and acts as 
the slot context for the `SimpleSlot`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

CC: @GJL 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink decoupleSlotFromSlotPool

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5088.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5088


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f
Author: Till Rohrmann 
Date:   2017-11-24T17:03:49Z

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.




---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266940#comment-16266940
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4581
  
Still failing with some checkstyle violations in `ResultPartition.java`


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266966#comment-16266966
 ] 

ASF GitHub Bot commented on FLINK-7873:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5074#discussion_r153235421
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 ---
@@ -510,6 +512,13 @@ private static void serializeStreamStateHandle(
byte[] internalData = byteStreamStateHandle.getData();
dos.writeInt(internalData.length);
dos.write(byteStreamStateHandle.getData());
+   } else if (stateHandle instanceof CachedStreamStateHandle) {
--- End diff --

This means we are actually introducing significant new code to the job 
manager, that even impacts the serialization format. I think this should not 
strictly be required if we map local state to checkpoint ids.


> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> -
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>
> Why i introduce this:
> Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwith, and obtain a faster 
> recover.
> Solution:
> TaskManager do the cache job and manage the cached data itself. It simple 
> use a TTL-like method to manage cache entry's dispose, we dispose a entry if 
> it wasn't be touched for a X time, once we touch a entry we reset the TTL for 
> it. In this way, all jobs is done by TaskManager, it transparent to 
> JobManager. The only problem is that we may dispose a entry that maybe 
> useful, in this case, we have to read from remote data finally, but users can 
> avoid this by set a proper TTL value according to checkpoint interval and 
> other things.
> Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8161) Flakey YARNSessionCapacitySchedulerITCase on Travis

2017-11-27 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8161:


 Summary: Flakey YARNSessionCapacitySchedulerITCase on Travis
 Key: FLINK-8161
 URL: https://issues.apache.org/jira/browse/FLINK-8161
 Project: Flink
  Issue Type: Bug
  Components: Tests, YARN
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Critical
 Fix For: 1.5.0


The {{YARNSessionCapacitySchedulerITCase}} spuriously fails on Travis because 
it now contains {{2017-11-25 22:49:49,204 WARN  
akka.remote.transport.netty.NettyTransport- Remote 
connection to [null] failed with java.nio.channels.NotYetConnectedException}} 
from time to time in the logs. I suspect that this is due to switching from 
Flakka to Akka 2.4.0. In order to solve this problem I propose to add this log 
statement to the whitelisted log statements.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r153253247
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
exclusiveBuffers.add(buffer);
}
 
-   Buffer takeExclusiveBuffer() {
-   return exclusiveBuffers.poll();
-   }
-
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
 
-   Buffer takeFloatingBuffer() {
-   return floatingBuffers.poll();
+   /**
+* Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
+* number of available buffers in queue is more than required 
amount.
+*
+* @param buffer The exclusive buffer of this channel.
+* @return Whether to recycle one floating buffer.
+*/
+   boolean maintainTargetSize(Buffer buffer) {
--- End diff --

Accidentally found that this was not addressed in the newest commits...can 
you change this?


---


[jira] [Commented] (FLINK-7642) Upgrade maven surefire plugin to 2.19.1

2017-11-27 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267037#comment-16267037
 ] 

Greg Hogan commented on FLINK-7642:
---

Please stop adding and removing a blank line from the description every few 
days.

Does 2.20.1 resolve SUREFIRE-1255? From the parent {{pom.xml}}:

{noformat}

2.18.1
{noformat}

> Upgrade maven surefire plugin to 2.19.1
> ---
>
> Key: FLINK-7642
> URL: https://issues.apache.org/jira/browse/FLINK-7642
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>
> Surefire 2.19 release introduced more useful test filters which would let us 
> run a subset of the test.
> This issue is for upgrading maven surefire plugin to 2.19.1



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-11-27 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5091

[FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to 
SlotPool

## What is the purpose of the change

This commit adds support for queued scheduling with slot sharing to the
SlotPool. The idea of slot sharing is that multiple tasks can run in the
same slot. Moreover, queued scheduling means that a slot request must not
be completed right away but at a later point in time. This allows to
start new TaskExecutors in case that there are no more slots left.

The main component responsible for the management of shared slots is the
SlotSharingManager. The SlotSharingManager maintains internally a tree-like
structure which stores the SlotContext future of the underlying
AllocatedSlot. Whenever this future is completed potentially pending
LogicalSlot instantiations are executed and sent to the slot requester.

A shared slot is represented by a MultiTaskSlot which can harbour multiple
TaskSlots. A TaskSlot can either be a MultiTaskSlot or a SingleTaskSlot.

In order to represent co-location constraints, we first obtain a root
MultiTaskSlot and then allocate a nested MultiTaskSlot in which the
co-located tasks are allocated. The corresponding SlotRequestID is assigned
to the CoLocationConstraint in order to make the TaskSlot retrievable for
other tasks assigned to the same CoLocationConstraint.

This PR also moves the `SlotPool` components to 
`o.a.f.runtime.jobmaster.slotpool`.

This PR is based on #5090 

## Brief change log

- Add `SlotSharingManager` to manage shared slots
- Rework `SlotPool` to use `SlotSharingManager`
- Add `SlotPool#allocateMultiTaskSlot` to allocate a shared slot
- Add `SlotPool#allocateCoLocatedMultiTaskSlot` to allocate a co-located 
slot
- Move `SlotPool` components to `o.a.f.runtime.jobmaster.slotpool`

## Verifying this change

- Port `SchedulerSlotSharingTest`, `SchedulerIsolatedTasksTest` and
`ScheduleWithCoLocationHintTest` to run with `SlotPool`
- Add `SlotSharingManagerTest`, `SlotPoolSlotSharingTest` and
`SlotPoolCoLocationTest` 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

CC: @GJL 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink slotPoolSlots

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5091.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5091


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f
Author: Till Rohrmann 
Date:   2017-11-24T17:03:49Z

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.

commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942
Author: Till Rohrmann 
Date:   2017-11-24T17:06:10Z

[FLINK-8088] 

[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267135#comment-16267135
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5091

[FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to 
SlotPool

## What is the purpose of the change

This commit adds support for queued scheduling with slot sharing to the
SlotPool. The idea of slot sharing is that multiple tasks can run in the
same slot. Moreover, queued scheduling means that a slot request must not
be completed right away but at a later point in time. This allows to
start new TaskExecutors in case that there are no more slots left.

The main component responsible for the management of shared slots is the
SlotSharingManager. The SlotSharingManager maintains internally a tree-like
structure which stores the SlotContext future of the underlying
AllocatedSlot. Whenever this future is completed potentially pending
LogicalSlot instantiations are executed and sent to the slot requester.

A shared slot is represented by a MultiTaskSlot which can harbour multiple
TaskSlots. A TaskSlot can either be a MultiTaskSlot or a SingleTaskSlot.

In order to represent co-location constraints, we first obtain a root
MultiTaskSlot and then allocate a nested MultiTaskSlot in which the
co-located tasks are allocated. The corresponding SlotRequestID is assigned
to the CoLocationConstraint in order to make the TaskSlot retrievable for
other tasks assigned to the same CoLocationConstraint.

This PR also moves the `SlotPool` components to 
`o.a.f.runtime.jobmaster.slotpool`.

This PR is based on #5090 

## Brief change log

- Add `SlotSharingManager` to manage shared slots
- Rework `SlotPool` to use `SlotSharingManager`
- Add `SlotPool#allocateMultiTaskSlot` to allocate a shared slot
- Add `SlotPool#allocateCoLocatedMultiTaskSlot` to allocate a co-located 
slot
- Move `SlotPool` components to `o.a.f.runtime.jobmaster.slotpool`

## Verifying this change

- Port `SchedulerSlotSharingTest`, `SchedulerIsolatedTasksTest` and
`ScheduleWithCoLocationHintTest` to run with `SlotPool`
- Add `SlotSharingManagerTest`, `SlotPoolSlotSharingTest` and
`SlotPoolCoLocationTest` 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

CC: @GJL 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink slotPoolSlots

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5091.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5091


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f
Author: Till Rohrmann 
Date:   2017-11-24T17:03:49Z

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2017-11-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r152985049
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException 
{
BufferPool bufferPool = null;
 
try {
-   if 
(gate.getConsumedPartitionType().isCreditBased()) {
-   // Create a fixed-size buffer 
pool for floating buffers and assign exclusive buffers to input channels 
directly
-   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
-   
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
-   } else {
-   int maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
-   
gate.getNumberOfInputChannels() * networkBuffersPerChannel +
-   
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
-   bufferPool = 
networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
-   
maxNumberOfMemorySegments);
-   }
+   // Create a fixed-size buffer pool for 
floating buffers and assign exclusive buffers to input channels directly
+   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
+   
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
--- End diff --

What about the non-bounded partition type that we use for batch processing? 
Shouldn't we use an unbounded number of floating buffers there, as previously?


---


[GitHub] flink pull request #5081: [FLINK-7717][flip6] Migrate TaskManagerMetricsHand...

2017-11-27 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5081#discussion_r153243460
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler that returns TaskManager metrics.
+ *
+ * @see MetricStore#getTaskManagerMetricStore(String)
+ */
+public class TaskManagerMetricsHandler extends 
AbstractMetricsHandler {
+
+   public TaskManagerMetricsHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map headers,
+   final MetricFetcher metricFetcher) {
+   super(localRestAddress, leaderRetriever, timeout, headers, 
TaskManagerMetricsHeaders.getInstance(),
+   metricFetcher);
+   }
+
+   @Nullable
+   @Override
+   protected MetricStore.ComponentMetricStore getComponentMetricStore(
+   final HandlerRequest request,
+   final MetricStore metricStore) {
+   final InstanceID pathParameter = 
request.getPathParameter(TaskManagerIdPathParameter.class);
--- End diff --

This PR should be probably merged after 
[FLINK-8150](https://issues.apache.org/jira/browse/FLINK-8150).


---


[jira] [Commented] (FLINK-7717) Port TaskManagerMetricsHandler to new REST endpoint

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267007#comment-16267007
 ] 

ASF GitHub Bot commented on FLINK-7717:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5081#discussion_r153243460
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler that returns TaskManager metrics.
+ *
+ * @see MetricStore#getTaskManagerMetricStore(String)
+ */
+public class TaskManagerMetricsHandler extends 
AbstractMetricsHandler {
+
+   public TaskManagerMetricsHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map headers,
+   final MetricFetcher metricFetcher) {
+   super(localRestAddress, leaderRetriever, timeout, headers, 
TaskManagerMetricsHeaders.getInstance(),
+   metricFetcher);
+   }
+
+   @Nullable
+   @Override
+   protected MetricStore.ComponentMetricStore getComponentMetricStore(
+   final HandlerRequest request,
+   final MetricStore metricStore) {
+   final InstanceID pathParameter = 
request.getPathParameter(TaskManagerIdPathParameter.class);
--- End diff --

This PR should be probably merged after 
[FLINK-8150](https://issues.apache.org/jira/browse/FLINK-8150).


> Port TaskManagerMetricsHandler to new REST endpoint
> ---
>
> Key: FLINK-7717
> URL: https://issues.apache.org/jira/browse/FLINK-7717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{TaskManagerMetricsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267030#comment-16267030
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r153253247
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
exclusiveBuffers.add(buffer);
}
 
-   Buffer takeExclusiveBuffer() {
-   return exclusiveBuffers.poll();
-   }
-
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
 
-   Buffer takeFloatingBuffer() {
-   return floatingBuffers.poll();
+   /**
+* Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
+* number of available buffers in queue is more than required 
amount.
+*
+* @param buffer The exclusive buffer of this channel.
+* @return Whether to recycle one floating buffer.
+*/
+   boolean maintainTargetSize(Buffer buffer) {
--- End diff --

Accidentally found that this was not addressed in the newest commits...can 
you change this?


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8089) Fulfil slot requests with unused offered slots

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267101#comment-16267101
 ] 

ASF GitHub Bot commented on FLINK-8089:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5090

[FLINK-8089] Also check for other pending slot requests in 
SlotPool#offerSlot

## What is the purpose of the change

Not only check for a slot request with the right allocation id but also 
check
whether we can fulfill other pending slot requests with an unclaimed offered
slot before adding it to the list of available slots in `SlotPool`.

This PR is based on #5089.

## Verifying this change

- Added `SlotPoolTest#testFulfillingSlotRequestsWithUnusedOfferedSlots` to 
check that unused offered slots are directly used to fulfill other pending slot 
requests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

CC: @GJL 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixSlotOffering

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5090.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5090


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f
Author: Till Rohrmann 
Date:   2017-11-24T17:03:49Z

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.

commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942
Author: Till Rohrmann 
Date:   2017-11-24T17:06:10Z

[FLINK-8088] Associate logical slots with the slot request id

Before logical slots like the SimpleSlot and SharedSlot where associated to 
the
actually allocated slot via the AllocationID. This, however, was 
sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical 
slots).
Therefore, we should bind the logical slots to the right id with the right 
lifecycle
which is the slot request id.

commit 3e4550c0607744b20893dc90c587b63e68e4de1e
Author: Till Rohrmann 
Date:   2017-11-13T14:42:07Z

[FLINK-8089] Also check for other pending slot requests in offerSlot

Not only check for a slot request with the right allocation id but also 
check
whether we can fulfill other pending slot requests with an unclaimed offered
slot before adding it to the list of available slots.

commit b04dda46aaf298d921929910574662970d9c5093
Author: Till Rohrmann 
Date:   2017-11-24T22:29:53Z

[hotfix] Speed up RecoveryITCase




> Fulfil slot requests with unused offered slots
> --
>
> Key: FLINK-8089
> URL: https://issues.apache.org/jira/browse/FLINK-8089
> Project: Flink
>  Issue Type: Improvement
>  Components: 

[GitHub] flink pull request #:

2017-11-27 Thread zentol
Github user zentol commented on the pull request:


https://github.com/apache/flink/commit/c940d5eff9897796625a696ed2989aed52c39ebd#commitcomment-25875079
  
In tools/releasing/create_source_release.sh:
In tools/releasing/create_source_release.sh on line 60:
maybe we should have a wildcard exclude for all files/directories starting 
with `.`.


---


[jira] [Commented] (FLINK-7642) Upgrade maven surefire plugin to 2.19.1

2017-11-27 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267051#comment-16267051
 ] 

Ted Yu commented on FLINK-7642:
---

Over in hbase, we use 2.20.1 .

SUREFIRE-1255 has been inactive - it seems 2.20.1 should not have the test hang 
issue.

> Upgrade maven surefire plugin to 2.19.1
> ---
>
> Key: FLINK-7642
> URL: https://issues.apache.org/jira/browse/FLINK-7642
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>
> Surefire 2.19 release introduced more useful test filters which would let us 
> run a subset of the test.
> This issue is for upgrading maven surefire plugin to 2.19.1



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...

2017-11-27 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5090

[FLINK-8089] Also check for other pending slot requests in 
SlotPool#offerSlot

## What is the purpose of the change

Not only check for a slot request with the right allocation id but also 
check
whether we can fulfill other pending slot requests with an unclaimed offered
slot before adding it to the list of available slots in `SlotPool`.

This PR is based on #5089.

## Verifying this change

- Added `SlotPoolTest#testFulfillingSlotRequestsWithUnusedOfferedSlots` to 
check that unused offered slots are directly used to fulfill other pending slot 
requests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

CC: @GJL 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixSlotOffering

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5090.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5090


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f
Author: Till Rohrmann 
Date:   2017-11-24T17:03:49Z

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the 
SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by 
the
SlotPool.

commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942
Author: Till Rohrmann 
Date:   2017-11-24T17:06:10Z

[FLINK-8088] Associate logical slots with the slot request id

Before logical slots like the SimpleSlot and SharedSlot where associated to 
the
actually allocated slot via the AllocationID. This, however, was 
sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical 
slots).
Therefore, we should bind the logical slots to the right id with the right 
lifecycle
which is the slot request id.

commit 3e4550c0607744b20893dc90c587b63e68e4de1e
Author: Till Rohrmann 
Date:   2017-11-13T14:42:07Z

[FLINK-8089] Also check for other pending slot requests in offerSlot

Not only check for a slot request with the right allocation id but also 
check
whether we can fulfill other pending slot requests with an unclaimed offered
slot before adding it to the list of available slots.

commit b04dda46aaf298d921929910574662970d9c5093
Author: Till Rohrmann 
Date:   2017-11-24T22:29:53Z

[hotfix] Speed up RecoveryITCase




---


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266984#comment-16266984
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r152985049
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException 
{
BufferPool bufferPool = null;
 
try {
-   if 
(gate.getConsumedPartitionType().isCreditBased()) {
-   // Create a fixed-size buffer 
pool for floating buffers and assign exclusive buffers to input channels 
directly
-   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
-   
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
-   } else {
-   int maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
-   
gate.getNumberOfInputChannels() * networkBuffersPerChannel +
-   
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
-   bufferPool = 
networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
-   
maxNumberOfMemorySegments);
-   }
+   // Create a fixed-size buffer pool for 
floating buffers and assign exclusive buffers to input channels directly
+   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
+   
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
--- End diff --

What about the non-bounded partition type that we use for batch processing? 
Shouldn't we use an unbounded number of floating buffers there, as previously?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8085) Thin out the LogicalSlot interface

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267082#comment-16267082
 ] 

ASF GitHub Bot commented on FLINK-8085:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5087

[FLINK-8085] Thin out LogicalSlot interface

## What is the purpose of the change

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

This PR is based on #5086.

## Brief change log

- Remove unnecessary methods from `LogicalSlot` interface
- Introduce abstraction for logical slot payload
- Let `Execution` implement `LogicalSlot.Payload`

## Verifying this change

This change added tests and can be verified as follows:

- Added `ExecutionTest#testTerminationFutureIsCompletedAfterSlotRelease` to 
check that the returned termination future of the `ExecutionVertex#cancel` is 
completed after the assigned resource has been released.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

CC: @GJL 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink thinoutLogicalSlot

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5087.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5087


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.




> Thin out the LogicalSlot interface
> --
>
> Key: FLINK-8085
> URL: https://issues.apache.org/jira/browse/FLINK-8085
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{LogicalSlot}} interface contains method which we don't strictly need 
> (e.g. {{isCanceled}}, {{isReleased}}). Moreover, we should decouple the 
> {{LogicalSlot}} from the {{Execution}} by only setting the 
> {{ExecutionAttemptID}} instead of {{Execution}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-11-27 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5087

[FLINK-8085] Thin out LogicalSlot interface

## What is the purpose of the change

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.

This PR is based on #5086.

## Brief change log

- Remove unnecessary methods from `LogicalSlot` interface
- Introduce abstraction for logical slot payload
- Let `Execution` implement `LogicalSlot.Payload`

## Verifying this change

This change added tests and can be verified as follows:

- Added `ExecutionTest#testTerminationFutureIsCompletedAfterSlotRelease` to 
check that the returned termination future of the `ExecutionVertex#cancel` is 
completed after the assigned resource has been released.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

CC: @GJL 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink thinoutLogicalSlot

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5087.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5087


commit d30dde83548dbeff4249f3b57b67cdb6247af510
Author: Till Rohrmann 
Date:   2017-11-14T22:50:52Z

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d
Author: Till Rohrmann 
Date:   2017-11-15T13:20:27Z

[FLINK-8085] Thin out LogicalSlot interface

Remove isCanceled, isReleased method and decouple logical slot from 
Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload 
interface
is implemented by the Execution and allows to fail an implementation and 
obtaining
a termination future.

Introduce proper Execution#releaseFuture which is completed once the 
Execution's
assigned resource has been released.




---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-11-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r153261175
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -152,6 +170,26 @@ public void channelRead(ChannelHandlerContext ctx, 
Object msg) throws Exception
}
}
 
+   @Override
+   public void userEventTriggered(ChannelHandlerContext ctx, Object msg) 
throws Exception {
+   if (msg instanceof RemoteInputChannel) {
+   boolean triggerWrite = 
inputChannelsWithCredit.isEmpty();
--- End diff --

how about some small comment as in `PartitionRequestQueue`? Something like
```
// Queue an input channel for available credits 
announcement.
// If the queue is empty, we try to trigger the actual 
write.
// Otherwise this will be handled by the
// writeAndFlushNextMessageIfPossible calls.
```


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-11-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r153269318
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -283,10 +283,13 @@ public String toString() {
// 

 
/**
-* Enqueue this input channel in the pipeline for sending unannounced 
credits to producer.
+* Enqueue this input channel in the pipeline for notifying the 
producer of unannounced credit.
 */
void notifyCreditAvailable() {
-   //TODO in next PR
+   // We should skip the notification if this channel is already 
released.
+   if (!isReleased.get() && partitionRequestClient != null) {
--- End diff --

shouldn't we
```
checkState(partitionRequestClient != null, "Tried to send 
credit announcement to producer before requesting a queue.");`
```
here as well? At the moment I don't see a valid usecase for `== null` and 
only a potential problem with the notification not being tried again.


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-11-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r153258008
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -88,6 +98,15 @@ void cancelRequestFor(InputChannelID inputChannelId) {
}
}
 
+   void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
--- End diff --

Can you please add a comment under which circumstances not to call this, 
i.e. we must make sure, `ctx` is assigned yet (so after the channel has been 
activated somehow). I checked the uses of this method and those seem to be 
safe, i.e. in `RemoteInputChannel`s `#notifyBufferAvailable()`, 
`#onSenderBacklog()`, and `#recycle()`. All of these should only happen after 
some interaction with the channel.


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-11-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r153263139
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -274,4 +313,49 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
--- End diff --

Please add some javadoc with a hint how all `inputChannelsWithCredit` will 
be handled, i.e. one is written immediately, following ones after successful 
writes.


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-11-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r153266186
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -539,4 +542,60 @@ static CloseRequest 
readFrom(@SuppressWarnings("unused") ByteBuf buffer) throws
return new CloseRequest();
}
}
+
+   static class AddCredit extends NettyMessage {
--- End diff --

Please add a comment ("incremental credit announcement from the client to 
the server"?).


---


[GitHub] flink issue #5047: Code refine of WordWithCount

2017-11-27 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5047
  
Do we still want to make this change since the PR is now only adding a 
single comment?


---


[jira] [Updated] (FLINK-8126) Update and fix checkstyle

2017-11-27 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-8126:
--
Fix Version/s: 1.4.0

> Update and fix checkstyle
> -
>
> Key: FLINK-8126
> URL: https://issues.apache.org/jira/browse/FLINK-8126
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.4.0, 1.5.0
>
>
> Our current checkstyle configuration (checkstyle version 6.19) is missing 
> some ImportOrder and variable naming errors which are detected in 1) IntelliJ 
> using the same checkstyle version and 2) with the maven-checkstyle-plugin 
> with an up-to-date checkstyle version (8.4).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8126) Update and fix checkstyle

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267309#comment-16267309
 ] 

ASF GitHub Bot commented on FLINK-8126:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5061


> Update and fix checkstyle
> -
>
> Key: FLINK-8126
> URL: https://issues.apache.org/jira/browse/FLINK-8126
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Our current checkstyle configuration (checkstyle version 6.19) is missing 
> some ImportOrder and variable naming errors which are detected in 1) IntelliJ 
> using the same checkstyle version and 2) with the maven-checkstyle-plugin 
> with an up-to-date checkstyle version (8.4).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5061: [hotfix] [docs] Update checkstyle version in docum...

2017-11-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5061


---


[jira] [Closed] (FLINK-8126) Update and fix checkstyle

2017-11-27 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-8126.
-
Resolution: Fixed

1.4: 4eae418b410c928b8e4b7893c1f5b9c48a5e3228

> Update and fix checkstyle
> -
>
> Key: FLINK-8126
> URL: https://issues.apache.org/jira/browse/FLINK-8126
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.4.0, 1.5.0
>
>
> Our current checkstyle configuration (checkstyle version 6.19) is missing 
> some ImportOrder and variable naming errors which are detected in 1) IntelliJ 
> using the same checkstyle version and 2) with the maven-checkstyle-plugin 
> with an up-to-date checkstyle version (8.4).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-8126) Update and fix checkstyle

2017-11-27 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan reopened FLINK-8126:
---

> Update and fix checkstyle
> -
>
> Key: FLINK-8126
> URL: https://issues.apache.org/jira/browse/FLINK-8126
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Our current checkstyle configuration (checkstyle version 6.19) is missing 
> some ImportOrder and variable naming errors which are detected in 1) IntelliJ 
> using the same checkstyle version and 2) with the maven-checkstyle-plugin 
> with an up-to-date checkstyle version (8.4).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >