[jira] [Commented] (FLINK-8022) Kafka at-least-once tests fail occasionally
[ https://issues.apache.org/jira/browse/FLINK-8022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266685#comment-16266685 ] ASF GitHub Bot commented on FLINK-8022: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5078 [FLINK-8022][kafka] Bump at-least-once timeout in tests Increasing timeout for reading the records from 30s to 60s seems to solve the issue for failing at-least-one tests. This is a minor fix in tests that intends to increase tests stability. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f8022 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5078.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5078 commit 31a0f9ecb6f39210e2b7dacee9bcf90f44ceee1f Author: Piotr NowojskiDate: 2017-11-23T11:53:37Z [FLINK-8022][kafka] Bump at-least-once timeout in tests Increaseing timeout for reading the records from 30s to 60s seems to solve the issue for failing at-least-one tests. > Kafka at-least-once tests fail occasionally > --- > > Key: FLINK-8022 > URL: https://issues.apache.org/jira/browse/FLINK-8022 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > Fix For: 1.5.0 > > > {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}} > seems to sporadically fail with missing data, like this execution: > {code} > > Test > testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase) > is running. > > 17:54:30,195 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - > Creating topic oneToOneTopicRegularSink > 17:54:30,196 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - In > getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436 > 17:54:30,204 INFO org.I0Itec.zkclient.ZkEventThread >- Starting ZkClient event thread. > 17:54:30,204 INFO org.I0Itec.zkclient.ZkClient >- Waiting for keeper state SyncConnected > 17:54:30,240 INFO org.I0Itec.zkclient.ZkClient >- zookeeper state changed (SyncConnected) > 17:54:30,261 INFO org.I0Itec.zkclient.ZkEventThread >- Terminate ZkClient event thread. > 17:54:30,265 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - Topic > oneToOneTopicRegularSink create request is successfully posted > 17:54:30,366 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - > Validating if the topic oneToOneTopicRegularSink has been created or not > 17:54:30,373 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - In > getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436 > 17:54:30,374 INFO org.I0Itec.zkclient.ZkEventThread >- Starting ZkClient event thread. > 17:54:30,374 INFO org.I0Itec.zkclient.ZkClient >- Waiting for keeper state SyncConnected > 17:54:30,404 INFO org.I0Itec.zkclient.ZkClient >- zookeeper state changed (SyncConnected) > 17:54:30,420 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - topic > oneToOneTopicRegularSink has been created successfully > 17:54:30,421 INFO org.I0Itec.zkclient.ZkEventThread >- Terminate ZkClient event thread. > 17:54:31,099 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase - > Starting FlinkKafkaProducer (1/1) to produce into default topic > oneToOneTopicRegularSink > 17:55:05,229 ERROR > org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase - > > Test > testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase) > failed with: > java.lang.AssertionError: Expected to contain all of: <[0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, > 46, 47,
[GitHub] flink issue #4594: [FLINK-7517][network] let NettyBufferPool extend PooledBy...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4594 Yes @zhijiangW, I kept the original logic. I guess, the reasoning behind using off-heap netty buffers only was to reduce the overhead before transmitting messages over the wire: 1) we reduce GC overhead somewhat and 2) at some point we need the memory to be off-heap and put into kernel space anyway - depending on netty, this may be optimised if it is already off-heap. Also, starting with #4481 we will only be using off-heap network buffers anyway. ---
[jira] [Commented] (FLINK-7517) let NettyBufferPool extend PooledByteBufAllocator
[ https://issues.apache.org/jira/browse/FLINK-7517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266688#comment-16266688 ] ASF GitHub Bot commented on FLINK-7517: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4594 Yes @zhijiangW, I kept the original logic. I guess, the reasoning behind using off-heap netty buffers only was to reduce the overhead before transmitting messages over the wire: 1) we reduce GC overhead somewhat and 2) at some point we need the memory to be off-heap and put into kernel space anyway - depending on netty, this may be optimised if it is already off-heap. Also, starting with #4481 we will only be using off-heap network buffers anyway. > let NettyBufferPool extend PooledByteBufAllocator > - > > Key: FLINK-7517 > URL: https://issues.apache.org/jira/browse/FLINK-7517 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > {{NettyBufferPool}} wraps {{PooledByteBufAllocator}} but due to this, any > allocated buffer's {{alloc()}} method is returning the wrapped > {{PooledByteBufAllocator}} which allowed heap buffers again. By extending the > {{PooledByteBufAllocator}}, we prevent this loop hole and also fix the > invariant that a copy of a buffer should have the same allocator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7530) Port existing REST handlers to support Flip-6 components
[ https://issues.apache.org/jira/browse/FLINK-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266744#comment-16266744 ] ASF GitHub Bot commented on FLINK-7530: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5079 [FLINK-7530][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint ## What is the purpose of the change *FLIP-6 efforts: Migrating HTTP handlers* ## Brief change log - Migrate logic from `org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to` new handler. - Add new handler to `DispatcherRestEndpoint`. ## Verifying this change - *Added unit tests for all new classes and modified existing classes except for DispatcherRestEndpoint.* - *Manually deployed a job locally and verified with `curl` that SubtaskMetrics can be queried in FLIP-6 standalone mode.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7530 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5079.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5079 commit 24d2ab0a600d476142cfadd453074b01506ba591 Author: gyaoDate: 2017-11-27T12:30:18Z [FLINK-7530][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint Migrate logic from org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to new handler. Add new handler to DispatcherRestEndpoint. commit 2ef55c4d4e94d287f5e8b924e35c79bf53ab957a Author: gyao Date: 2017-11-27T12:32:52Z [hotfix][Javadoc] Remove wrong Javadoc from SubtaskMetricsHandler commit b95b364640b43ba90f62d8c82a6d6c7f93e4ce65 Author: gyao Date: 2017-11-27T12:34:02Z [hotfix][flip6] Add unit tests JobVertexMetricsHeadersTest > Port existing REST handlers to support Flip-6 components > > > Key: FLINK-7530 > URL: https://issues.apache.org/jira/browse/FLINK-7530 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > This is the umbrella issue for porting the existing REST handlers to work > together with the new {{RestServerEndpoint}} and the {{AbstractRestHandler}}. > This is the requirement to make them work with the Flip-6 {{Dispatcher}} and > {{JobMaster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5079: [FLINK-7530][flip6] Migrate SubtaskMetricsHandler ...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5079 [FLINK-7530][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint ## What is the purpose of the change *FLIP-6 efforts: Migrating HTTP handlers* ## Brief change log - Migrate logic from `org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to` new handler. - Add new handler to `DispatcherRestEndpoint`. ## Verifying this change - *Added unit tests for all new classes and modified existing classes except for DispatcherRestEndpoint.* - *Manually deployed a job locally and verified with `curl` that SubtaskMetrics can be queried in FLIP-6 standalone mode.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7530 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5079.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5079 commit 24d2ab0a600d476142cfadd453074b01506ba591 Author: gyaoDate: 2017-11-27T12:30:18Z [FLINK-7530][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint Migrate logic from org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to new handler. Add new handler to DispatcherRestEndpoint. commit 2ef55c4d4e94d287f5e8b924e35c79bf53ab957a Author: gyao Date: 2017-11-27T12:32:52Z [hotfix][Javadoc] Remove wrong Javadoc from SubtaskMetricsHandler commit b95b364640b43ba90f62d8c82a6d6c7f93e4ce65 Author: gyao Date: 2017-11-27T12:34:02Z [hotfix][flip6] Add unit tests JobVertexMetricsHeadersTest ---
[jira] [Created] (FLINK-8159) Add rich support for SelectWrapper and FlatSelectWrapper
Dian Fu created FLINK-8159: -- Summary: Add rich support for SelectWrapper and FlatSelectWrapper Key: FLINK-8159 URL: https://issues.apache.org/jira/browse/FLINK-8159 Project: Flink Issue Type: Sub-task Components: CEP Reporter: Dian Fu Assignee: Dian Fu {{SelectWrapper}} and {{FlatSelectWrapper}} should extends {{AbstractRichFucntion}} and process properly if the underlying functions extend RichFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8158) Rowtime window inner join emits late data
Hequn Cheng created FLINK-8158: -- Summary: Rowtime window inner join emits late data Key: FLINK-8158 URL: https://issues.apache.org/jira/browse/FLINK-8158 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Hequn Cheng Assignee: Hequn Cheng When executing the join, the join operator needs to make sure that no late data is emitted. Currently, this achieved by holding back watermarks. However, the window border is not handled correctly. For the sql bellow: {quote} val sqlQuery = """ |SELECT t2.key, t2.id, t1.id |FROM T1 as t1 join T2 as t2 ON | t1.key = t2.key AND | t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND |t2.rt + INTERVAL '1' SECOND |""".stripMargin val data1 = new mutable.MutableList[(String, String, Long)] // for boundary test data1.+=(("A", "LEFT1", 6000L)) val data2 = new mutable.MutableList[(String, String, Long)] data2.+=(("A", "RIGHT1", 6000L)) {quote} Join will output a watermark with timestamp 1000, but if left comes with another data ("A", "LEFT1", 1000L), join will output a record with timestamp 1000 which equals previous watermark. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5082: [FLINK-8143][flip6] Migrate SubtaskMetricsHandler ...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5082 [FLINK-8143][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint ## What is the purpose of the change *FLIP-6 efforts: Migrating HTTP handlers* ## Brief change log - Migrate logic from `org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to` new handler. - Add new handler to `DispatcherRestEndpoint`. ## Verifying this change - *Added unit tests for all new classes and modified existing classes except for DispatcherRestEndpoint.* - *Manually deployed a job locally and verified with `curl` that SubtaskMetrics can be queried in FLIP-6 standalone mode.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8143 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5082.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5082 commit d32d978e1826df6fab8f5b4a27d47b4367d54ece Author: gyaoDate: 2017-11-27T12:30:18Z [FLINK-8150][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint Migrate logic from org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to new handler. Add new handler to DispatcherRestEndpoint. commit 96c9f671d6320d5d1d014d5df27f4a3a07417993 Author: gyao Date: 2017-11-27T12:32:52Z [hotfix][Javadoc] Remove wrong Javadoc from SubtaskMetricsHandler commit c369d733fecc71a7581b5a6cbf196b35ecdfed12 Author: gyao Date: 2017-11-27T12:34:02Z [hotfix][flip6] Add unit tests JobVertexMetricsHeadersTest ---
[GitHub] flink pull request #5079: [FLINK-8143][flip6] Migrate SubtaskMetricsHandler ...
Github user GJL closed the pull request at: https://github.com/apache/flink/pull/5079 ---
[GitHub] flink issue #5049: [FLINK-8081][metrics] Annotate 'MetricRegistry#getReporte...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5049 merging. ---
[jira] [Commented] (FLINK-8081) Annotate MetricRegistry#getReporters() with @VisibleForTesting
[ https://issues.apache.org/jira/browse/FLINK-8081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266693#comment-16266693 ] ASF GitHub Bot commented on FLINK-8081: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5049 merging. > Annotate MetricRegistry#getReporters() with @VisibleForTesting > -- > > Key: FLINK-8081 > URL: https://issues.apache.org/jira/browse/FLINK-8081 > Project: Flink > Issue Type: Improvement > Components: Metrics, Tests >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > > {{MetricRegistry#getReporters()}} is only used for testing purposes to > provide access to instantiated reporters. We should annotate this method with > {{@VisibleForTesting}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5068: [FLINK-8122] [table] Name all built-in table sinks...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5068#discussion_r153179456 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -60,4 +60,14 @@ trait TableSource[T] { */ def explainSource(): String = "" + /** +* Gets the name which is used by the visualization and logging during runtime. +* +* @return Name of the [[TableSource]]. +*/ + def getRuntimeName(): String = { --- End diff -- Hi @fhueske, thanks for the suggestion. I'll remove this method and add a default implementation for `explainSource()`. ---
[jira] [Commented] (FLINK-8122) Name all table sinks and sources
[ https://issues.apache.org/jira/browse/FLINK-8122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266715#comment-16266715 ] ASF GitHub Bot commented on FLINK-8122: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5068#discussion_r153179456 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -60,4 +60,14 @@ trait TableSource[T] { */ def explainSource(): String = "" + /** +* Gets the name which is used by the visualization and logging during runtime. +* +* @return Name of the [[TableSource]]. +*/ + def getRuntimeName(): String = { --- End diff -- Hi @fhueske, thanks for the suggestion. I'll remove this method and add a default implementation for `explainSource()`. > Name all table sinks and sources > > > Key: FLINK-8122 > URL: https://issues.apache.org/jira/browse/FLINK-8122 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Not all table sink and sources have proper names. Therefore, they are > displayed as "Unnamed" in the logs and Web UI (e.g. CsvTableSink). We should > add names for all built-in connectors. Having information about the table > sink name (via {{INSERT INTO}}) would be even better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5081: [FLINK-7530][flip6] Migrate TaskManagerMetricsHand...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5081 [FLINK-7530][flip6] Migrate TaskManagerMetricsHandler to new RestServerEndpoint ## What is the purpose of the change *FLIP-6 efforts: Migrating HTTP handlers* ## Brief change log - Migrate logic from `org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler to` new handler. - Add new handler to `DispatcherRestEndpoint`. ## Verifying this change - *Added unit tests for all new classes and modified existing classes except for DispatcherRestEndpoint.* - *Manually deployed a job locally and verified with `curl` that TaskManager metrics can be queried in FLIP-6 standalone mode. Note that the task manager ids exposed by the WebUI in FLIP-6 mode are currently broken (https://issues.apache.org/jira/browse/FLINK-8150). I had to obtain valid ids by setting a breakpoint in `ResourceManager`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7530-tm-metrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5081.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5081 commit 71ee76139e0703bea74623353c7175533582aa16 Author: gyaoDate: 2017-11-27T12:57:48Z [FLINK-7530][flip6] Migrate TaskManagerMetricsHandler to new RestServerEndpoint Migrate logic in org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler to new handler, and add new handler to DispatcherRestEndpoint. ---
[jira] [Commented] (FLINK-8154) JobSubmissionClientActor submited job,but there is no connection to a JobManager
[ https://issues.apache.org/jira/browse/FLINK-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266782#comment-16266782 ] Patrick Lucas commented on FLINK-8154: -- Is `act-monitor-flink-jobmanager` a Kubernetes Service? If so, it's looking like the Service's IP is `10.3.0.81`, but the Pod's IP is `10.2.43.51`. Depending on your network configuration, these may not be routable to each other. Can you confirm connectivity by, for example, running `nc -v 10.2.43.51 6123` from within the Pod? (You may have to install `netcat`) > JobSubmissionClientActor submited job,but there is no connection to a > JobManager > - > > Key: FLINK-8154 > URL: https://issues.apache.org/jira/browse/FLINK-8154 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.2 > Environment: Kubernetes 1.8.3, Platform "Linux/amd64" >Reporter: Gregory Melekh >Priority: Blocker > > There is JobManager log file bellow. > 2017-11-26 08:17:13,435 INFO org.apache.flink.client.CliFrontend > - > > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - Starting Command Line Client (Version: 1.3.2, Rev:0399bee, > Date:03.08.2017 @ 10:23:11 UTC) > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - Current user: root > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - > 1.8/25.131-b11 > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - Maximum heap size: 6252 MiBytes > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre > 2017-11-26 08:17:13,439 INFO org.apache.flink.client.CliFrontend > - Hadoop version: 2.7.2 > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - JVM Options: > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - > -Dlog.file=/opt/flink/log/flink--client-act-monitor-flink-jobmanager-66cd4bdb5c-8kxbh.log > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - -Dlog4j.configuration=file:/etc/flink/log4j-cli.properties > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - -Dlogback.configurationFile=file:/etc/flink/logback.xml > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - Program Arguments: > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - run > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - -c > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - monitoring.flow.AccumulateAll > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - /tmp/monitoring-0.0.1-SNAPSHOT.jar > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - Classpath: > /opt/flink/lib/flink-python_2.11-1.3.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.3.2.jar::: > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - > > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - Using configuration directory /etc/flink > 2017-11-26 08:17:13,441 INFO org.apache.flink.client.CliFrontend > - Trying to load configuration file > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: blob.server.port, 6124 > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: jobmanager.rpc.address, act-monitor-flink-jobmanager > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: jobmanager.rpc.port, 6123 > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: jobmanager.heap.mb, 1024 > 2017-11-26 08:17:13,443 INFO >
[GitHub] flink issue #5032: [FLINK-8090] [DataStream] Improve the error message for d...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5032 Thanks for the suggestion @aljoscha. The problem is the state type is provided via a generic type parameter `S extends State`, which will be erased in runtime. Thus it's hard to do type checking in `AbstractKeyedStateBackend` unless we explicitly store and check the type for each state name (and that may affect the performance). The existing "leave alone" solution seems to be the most efficient way, but we can only get a `ClassCastException` with that. What do you think? ---
[GitHub] flink pull request #5078: [FLINK-8022][kafka] Bump at-least-once timeout in ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5078 ---
[jira] [Commented] (FLINK-8022) Kafka at-least-once tests fail occasionally
[ https://issues.apache.org/jira/browse/FLINK-8022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266694#comment-16266694 ] ASF GitHub Bot commented on FLINK-8022: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5078 Thanks! I merged this > Kafka at-least-once tests fail occasionally > --- > > Key: FLINK-8022 > URL: https://issues.apache.org/jira/browse/FLINK-8022 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > Fix For: 1.5.0 > > > {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}} > seems to sporadically fail with missing data, like this execution: > {code} > > Test > testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase) > is running. > > 17:54:30,195 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - > Creating topic oneToOneTopicRegularSink > 17:54:30,196 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - In > getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436 > 17:54:30,204 INFO org.I0Itec.zkclient.ZkEventThread >- Starting ZkClient event thread. > 17:54:30,204 INFO org.I0Itec.zkclient.ZkClient >- Waiting for keeper state SyncConnected > 17:54:30,240 INFO org.I0Itec.zkclient.ZkClient >- zookeeper state changed (SyncConnected) > 17:54:30,261 INFO org.I0Itec.zkclient.ZkEventThread >- Terminate ZkClient event thread. > 17:54:30,265 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - Topic > oneToOneTopicRegularSink create request is successfully posted > 17:54:30,366 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - > Validating if the topic oneToOneTopicRegularSink has been created or not > 17:54:30,373 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - In > getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436 > 17:54:30,374 INFO org.I0Itec.zkclient.ZkEventThread >- Starting ZkClient event thread. > 17:54:30,374 INFO org.I0Itec.zkclient.ZkClient >- Waiting for keeper state SyncConnected > 17:54:30,404 INFO org.I0Itec.zkclient.ZkClient >- zookeeper state changed (SyncConnected) > 17:54:30,420 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - topic > oneToOneTopicRegularSink has been created successfully > 17:54:30,421 INFO org.I0Itec.zkclient.ZkEventThread >- Terminate ZkClient event thread. > 17:54:31,099 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase - > Starting FlinkKafkaProducer (1/1) to produce into default topic > oneToOneTopicRegularSink > 17:55:05,229 ERROR > org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase - > > Test > testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase) > failed with: > java.lang.AssertionError: Expected to contain all of: <[0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, > 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, > 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, > 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, > 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, > 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, > 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, > 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, > 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, > 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, > 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, > 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, > 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, > 237, 238, 239, 240, 241, 242, 243,
[jira] [Closed] (FLINK-8022) Kafka at-least-once tests fail occasionally
[ https://issues.apache.org/jira/browse/FLINK-8022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-8022. --- Resolution: Fixed Assignee: Piotr Nowojski (was: Tzu-Li (Gordon) Tai) Fixed on master in da38a219a9abf31f53318b2a902bd064bfd0a775 Fixed on release-1.4 in 4a46507e980b880a20bd821ee55d51bc787df124 > Kafka at-least-once tests fail occasionally > --- > > Key: FLINK-8022 > URL: https://issues.apache.org/jira/browse/FLINK-8022 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > Fix For: 1.5.0 > > > {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}} > seems to sporadically fail with missing data, like this execution: > {code} > > Test > testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase) > is running. > > 17:54:30,195 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - > Creating topic oneToOneTopicRegularSink > 17:54:30,196 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - In > getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436 > 17:54:30,204 INFO org.I0Itec.zkclient.ZkEventThread >- Starting ZkClient event thread. > 17:54:30,204 INFO org.I0Itec.zkclient.ZkClient >- Waiting for keeper state SyncConnected > 17:54:30,240 INFO org.I0Itec.zkclient.ZkClient >- zookeeper state changed (SyncConnected) > 17:54:30,261 INFO org.I0Itec.zkclient.ZkEventThread >- Terminate ZkClient event thread. > 17:54:30,265 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - Topic > oneToOneTopicRegularSink create request is successfully posted > 17:54:30,366 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - > Validating if the topic oneToOneTopicRegularSink has been created or not > 17:54:30,373 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - In > getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436 > 17:54:30,374 INFO org.I0Itec.zkclient.ZkEventThread >- Starting ZkClient event thread. > 17:54:30,374 INFO org.I0Itec.zkclient.ZkClient >- Waiting for keeper state SyncConnected > 17:54:30,404 INFO org.I0Itec.zkclient.ZkClient >- zookeeper state changed (SyncConnected) > 17:54:30,420 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - topic > oneToOneTopicRegularSink has been created successfully > 17:54:30,421 INFO org.I0Itec.zkclient.ZkEventThread >- Terminate ZkClient event thread. > 17:54:31,099 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase - > Starting FlinkKafkaProducer (1/1) to produce into default topic > oneToOneTopicRegularSink > 17:55:05,229 ERROR > org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase - > > Test > testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase) > failed with: > java.lang.AssertionError: Expected to contain all of: <[0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, > 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, > 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, > 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, > 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, > 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, > 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, > 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, > 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, > 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, > 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, > 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, > 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236,
[jira] [Commented] (FLINK-8022) Kafka at-least-once tests fail occasionally
[ https://issues.apache.org/jira/browse/FLINK-8022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266696#comment-16266696 ] ASF GitHub Bot commented on FLINK-8022: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5078 > Kafka at-least-once tests fail occasionally > --- > > Key: FLINK-8022 > URL: https://issues.apache.org/jira/browse/FLINK-8022 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > Fix For: 1.5.0 > > > {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}} > seems to sporadically fail with missing data, like this execution: > {code} > > Test > testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase) > is running. > > 17:54:30,195 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - > Creating topic oneToOneTopicRegularSink > 17:54:30,196 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - In > getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436 > 17:54:30,204 INFO org.I0Itec.zkclient.ZkEventThread >- Starting ZkClient event thread. > 17:54:30,204 INFO org.I0Itec.zkclient.ZkClient >- Waiting for keeper state SyncConnected > 17:54:30,240 INFO org.I0Itec.zkclient.ZkClient >- zookeeper state changed (SyncConnected) > 17:54:30,261 INFO org.I0Itec.zkclient.ZkEventThread >- Terminate ZkClient event thread. > 17:54:30,265 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - Topic > oneToOneTopicRegularSink create request is successfully posted > 17:54:30,366 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - > Validating if the topic oneToOneTopicRegularSink has been created or not > 17:54:30,373 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - In > getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436 > 17:54:30,374 INFO org.I0Itec.zkclient.ZkEventThread >- Starting ZkClient event thread. > 17:54:30,374 INFO org.I0Itec.zkclient.ZkClient >- Waiting for keeper state SyncConnected > 17:54:30,404 INFO org.I0Itec.zkclient.ZkClient >- zookeeper state changed (SyncConnected) > 17:54:30,420 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - topic > oneToOneTopicRegularSink has been created successfully > 17:54:30,421 INFO org.I0Itec.zkclient.ZkEventThread >- Terminate ZkClient event thread. > 17:54:31,099 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase - > Starting FlinkKafkaProducer (1/1) to produce into default topic > oneToOneTopicRegularSink > 17:55:05,229 ERROR > org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase - > > Test > testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase) > failed with: > java.lang.AssertionError: Expected to contain all of: <[0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, > 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, > 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, > 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, > 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, > 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, > 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, > 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, > 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, > 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, > 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, > 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, > 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, > 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250,
[GitHub] flink issue #5078: [FLINK-8022][kafka] Bump at-least-once timeout in tests
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5078 Thanks! I merged this ð ---
[jira] [Closed] (FLINK-8153) Upgrade to JDK 9
[ https://issues.apache.org/jira/browse/FLINK-8153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8153. --- Resolution: Later There are no plans at the moment to fully switch to JDK 9, that is dropping java 8. We are however working on making Flink build/run on java 9, see FLINK-8033. > Upgrade to JDK 9 > > > Key: FLINK-8153 > URL: https://issues.apache.org/jira/browse/FLINK-8153 > Project: Flink > Issue Type: Improvement > Environment: Development >Reporter: Paul Meshkovsky >Priority: Minor > > Guys any plans to upgrade to JDK 9? ... Maybe it should be considered as part > of Flink 2.0 release... Thanks... Example some of old connector technologies > can be dropped I think Kafka 1.0 client should be compatible with lower > versions of Kafka.. I am new to flink so just asking -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8160) Extend OperatorHarness to expose metrics
Chesnay Schepler created FLINK-8160: --- Summary: Extend OperatorHarness to expose metrics Key: FLINK-8160 URL: https://issues.apache.org/jira/browse/FLINK-8160 Project: Flink Issue Type: Improvement Components: Metrics, Streaming Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.5.0 To better test interactions between operators and metrics the harness should expose the metrics registered by the operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-5789) Make Bucketing Sink independent of Hadoop's FileSystem
[ https://issues.apache.org/jira/browse/FLINK-5789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reassigned FLINK-5789: --- Assignee: (was: Gary Yao) > Make Bucketing Sink independent of Hadoop's FileSystem > -- > > Key: FLINK-5789 > URL: https://issues.apache.org/jira/browse/FLINK-5789 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0, 1.1.4 >Reporter: Stephan Ewen > Fix For: 1.5.0 > > > The {{BucketingSink}} is hard wired to Hadoop's FileSystem, bypassing Flink's > file system abstraction. > This causes several issues: > - The bucketing sink will behave different than other file sinks with > respect to configuration > - Directly supported file systems (not through hadoop) like the MapR File > System does not work in the same way with the BuketingSink as other file > systems > - The previous point is all the more problematic in the effort to make > Hadoop an optional dependency and with in other stacks (Mesos, Kubernetes, > AWS, GCE, Azure) with ideally no Hadoop dependency. > We should port the {{BucketingSink}} to use Flink's FileSystem classes. > To support the *truncate* functionality that is needed for the exactly-once > semantics of the Bucketing Sink, we should extend Flink's FileSystem > abstraction to have the methods > - {{boolean supportsTruncate()}} > - {{void truncate(Path, long)}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5032: [FLINK-8090] [DataStream] Improve the error message for d...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5032 We could check based on the state descriptor. ---
[GitHub] flink pull request #5049: [FLINK-8081][metrics] Annotate 'MetricRegistry#get...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5049 ---
[jira] [Closed] (FLINK-8081) Annotate MetricRegistry#getReporters() with @VisibleForTesting
[ https://issues.apache.org/jira/browse/FLINK-8081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8081. --- Resolution: Fixed Fix Version/s: 1.4.1 1.5.0 1.4: bd1e03374d0f240f5a0b406ace662e4391cce1d5 master: fbddf35faf59d5b762a71d42155852b29ef0da48 > Annotate MetricRegistry#getReporters() with @VisibleForTesting > -- > > Key: FLINK-8081 > URL: https://issues.apache.org/jira/browse/FLINK-8081 > Project: Flink > Issue Type: Improvement > Components: Metrics, Tests >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.5.0, 1.4.1 > > > {{MetricRegistry#getReporters()}} is only used for testing purposes to > provide access to instantiated reporters. We should annotate this method with > {{@VisibleForTesting}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8081) Annotate MetricRegistry#getReporters() with @VisibleForTesting
[ https://issues.apache.org/jira/browse/FLINK-8081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266699#comment-16266699 ] ASF GitHub Bot commented on FLINK-8081: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5049 > Annotate MetricRegistry#getReporters() with @VisibleForTesting > -- > > Key: FLINK-8081 > URL: https://issues.apache.org/jira/browse/FLINK-8081 > Project: Flink > Issue Type: Improvement > Components: Metrics, Tests >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.5.0, 1.4.1 > > > {{MetricRegistry#getReporters()}} is only used for testing purposes to > provide access to instantiated reporters. We should annotate this method with > {{@VisibleForTesting}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5077: [docs] fix wrong package name
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5077 nice catch, merging. ---
[jira] [Commented] (FLINK-7530) Port existing REST handlers to support Flip-6 components
[ https://issues.apache.org/jira/browse/FLINK-7530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266765#comment-16266765 ] ASF GitHub Bot commented on FLINK-7530: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5081 [FLINK-7530][flip6] Migrate TaskManagerMetricsHandler to new RestServerEndpoint ## What is the purpose of the change *FLIP-6 efforts: Migrating HTTP handlers* ## Brief change log - Migrate logic from `org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler to` new handler. - Add new handler to `DispatcherRestEndpoint`. ## Verifying this change - *Added unit tests for all new classes and modified existing classes except for DispatcherRestEndpoint.* - *Manually deployed a job locally and verified with `curl` that TaskManager metrics can be queried in FLIP-6 standalone mode. Note that the task manager ids exposed by the WebUI in FLIP-6 mode are currently broken (https://issues.apache.org/jira/browse/FLINK-8150). I had to obtain valid ids by setting a breakpoint in `ResourceManager`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7530-tm-metrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5081.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5081 commit 71ee76139e0703bea74623353c7175533582aa16 Author: gyaoDate: 2017-11-27T12:57:48Z [FLINK-7530][flip6] Migrate TaskManagerMetricsHandler to new RestServerEndpoint Migrate logic in org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler to new handler, and add new handler to DispatcherRestEndpoint. > Port existing REST handlers to support Flip-6 components > > > Key: FLINK-7530 > URL: https://issues.apache.org/jira/browse/FLINK-7530 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > This is the umbrella issue for porting the existing REST handlers to work > together with the new {{RestServerEndpoint}} and the {{AbstractRestHandler}}. > This is the requirement to make them work with the Flip-6 {{Dispatcher}} and > {{JobMaster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7716) Port JobManagerMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266800#comment-16266800 ] ASF GitHub Bot commented on FLINK-7716: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5083 [FLINK-7716][flip6] Migrate JobManagerMetricsHandler to new RestServerEndpoint ## What is the purpose of the change *FLIP-6 efforts: Migrating HTTP handlers* ## Brief change log - Migrate logic from `org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler to` new handler. - Add new handler to `DispatcherRestEndpoint`. ## Verifying this change - *Added unit tests for all new classes and modified existing classes except for DispatcherRestEndpoint.* - *Manually deployed a job locally and verified with `curl` that JobManager metrics can be queried in FLIP-6 standalone mode.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7716 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5083.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5083 commit 17244d63fcc1610c83a5c020a63fe045087a2f07 Author: gyaoDate: 2017-11-27T13:18:48Z [FLIP-7716][flip6] Migrate JobManagerMetricsHandler to new RestServerEndpoint Migrate logic in org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler to new handler and add new handler to DispatcherRestEndpoint. commit 51d37ccce04b912dcda6f5ae74d8092d22b5ff9d Author: gyao Date: 2017-11-27T13:22:17Z [FLINK-7716][Javadoc] Deprecate method MetricStore#getJobManager(). There is a semantically equivalent method in MetricStore. commit c7283194b19719270e4c0c2cc185876e710b750a Author: gyao Date: 2017-11-27T13:24:42Z [hotfix][Javadoc] Fix typo in ConversionException > Port JobManagerMetricsHandler to new REST endpoint > -- > > Key: FLINK-7716 > URL: https://issues.apache.org/jira/browse/FLINK-7716 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JobManagerMetricsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
[ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266805#comment-16266805 ] ASF GitHub Bot commented on FLINK-8090: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5032 Thanks for the suggestion @aljoscha. The problem is the state type is provided via a generic type parameter `S extends State`, which will be erased in runtime. Thus it's hard to do type checking in `AbstractKeyedStateBackend` unless we explicitly store and check the type for each state name (and that may affect the performance). The existing "leave alone" solution seems to be the most efficient way, but we can only get a `ClassCastException` with that. What do you think? > Improve error message when registering different states under the same name. > > > Key: FLINK-8090 > URL: https://issues.apache.org/jira/browse/FLINK-8090 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Xingcan Cui > > Currently a {{ProcessFunction}} like this: > {code} > final MapStateDescriptor> > firstMapStateDescriptor = new MapStateDescriptor<>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO, > source.getType()); > final ListStateDescriptor secondListStateDescriptor = new > ListStateDescriptor( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO); > new ProcessFunction , Object>() { > private static final long serialVersionUID = > -805125545438296619L; > private transient MapState Tuple2 > firstMapState; > private transient ListState > secondListState; > @Override > public void open(Configuration parameters) > throws Exception { > super.open(parameters); > firstMapState = > getRuntimeContext().getMapState(firstMapStateDescriptor); > secondListState = > getRuntimeContext().getListState(secondListStateDescriptor); > } > @Override > public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception { > Tuple2 v = > firstMapState.get(value.f0); > if (v == null) { > v = new Tuple2<>(value.f0, 0L); > } > firstMapState.put(value.f0, new > Tuple2<>(v.f0, v.f1 + value.f1)); > } > } > {code} > fails with: > {code} > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127) > at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: > org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to > org.apache.flink.api.common.state.ListState > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71) > ... 9 more > {code} > Which is cryptic, as it does not explain the reason for the problem. The > error message should be something along the line of "Duplicate state name". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
[ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266871#comment-16266871 ] ASF GitHub Bot commented on FLINK-8090: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5032 We could check based on the state descriptor. > Improve error message when registering different states under the same name. > > > Key: FLINK-8090 > URL: https://issues.apache.org/jira/browse/FLINK-8090 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Xingcan Cui > > Currently a {{ProcessFunction}} like this: > {code} > final MapStateDescriptor> > firstMapStateDescriptor = new MapStateDescriptor<>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO, > source.getType()); > final ListStateDescriptor secondListStateDescriptor = new > ListStateDescriptor( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO); > new ProcessFunction , Object>() { > private static final long serialVersionUID = > -805125545438296619L; > private transient MapState Tuple2 > firstMapState; > private transient ListState > secondListState; > @Override > public void open(Configuration parameters) > throws Exception { > super.open(parameters); > firstMapState = > getRuntimeContext().getMapState(firstMapStateDescriptor); > secondListState = > getRuntimeContext().getListState(secondListStateDescriptor); > } > @Override > public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception { > Tuple2 v = > firstMapState.get(value.f0); > if (v == null) { > v = new Tuple2<>(value.f0, 0L); > } > firstMapState.put(value.f0, new > Tuple2<>(v.f0, v.f1 + value.f1)); > } > } > {code} > fails with: > {code} > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127) > at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: > org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to > org.apache.flink.api.common.state.ListState > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71) > ... 9 more > {code} > Which is cryptic, as it does not explain the reason for the problem. The > error message should be something along the line of "Duplicate state name". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8158) Rowtime window inner join emits late data
[ https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-8158: --- Description: When executing the join, the join operator needs to make sure that no late data is emitted. Currently, this achieved by holding back watermarks. However, the window border is not handled correctly. For the sql bellow: {quote} val sqlQuery = """ SELECT t2.key, t2.id, t1.id FROM T1 as t1 join T2 as t2 ON t1.key = t2.key AND t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND t2.rt + INTERVAL '1' SECOND """.stripMargin val data1 = new mutable.MutableList[(String, String, Long)] // for boundary test data1.+=(("A", "LEFT1", 6000L)) val data2 = new mutable.MutableList[(String, String, Long)] data2.+=(("A", "RIGHT1", 6000L)) {quote} Join will output a watermark with timestamp 1000, but if left comes with another data ("A", "LEFT1", 1000L), join will output a record with timestamp 1000 which equals previous watermark. was: When executing the join, the join operator needs to make sure that no late data is emitted. Currently, this achieved by holding back watermarks. However, the window border is not handled correctly. For the sql bellow: {quote} val sqlQuery = """ |SELECT t2.key, t2.id, t1.id |FROM T1 as t1 join T2 as t2 ON | t1.key = t2.key AND | t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND |t2.rt + INTERVAL '1' SECOND |""".stripMargin val data1 = new mutable.MutableList[(String, String, Long)] // for boundary test data1.+=(("A", "LEFT1", 6000L)) val data2 = new mutable.MutableList[(String, String, Long)] data2.+=(("A", "RIGHT1", 6000L)) {quote} Join will output a watermark with timestamp 1000, but if left comes with another data ("A", "LEFT1", 1000L), join will output a record with timestamp 1000 which equals previous watermark. > Rowtime window inner join emits late data > - > > Key: FLINK-8158 > URL: https://issues.apache.org/jira/browse/FLINK-8158 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > When executing the join, the join operator needs to make sure that no late > data is emitted. Currently, this achieved by holding back watermarks. > However, the window border is not handled correctly. For the sql bellow: > {quote} > val sqlQuery = > """ > SELECT t2.key, t2.id, t1.id > FROM T1 as t1 join T2 as t2 ON > t1.key = t2.key AND > t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND > t2.rt + INTERVAL '1' SECOND > """.stripMargin > val data1 = new mutable.MutableList[(String, String, Long)] > // for boundary test > data1.+=(("A", "LEFT1", 6000L)) > val data2 = new mutable.MutableList[(String, String, Long)] > data2.+=(("A", "RIGHT1", 6000L)) > {quote} > Join will output a watermark with timestamp 1000, but if left comes with > another data ("A", "LEFT1", 1000L), join will output a record with timestamp > 1000 which equals previous watermark. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8022) Kafka at-least-once tests fail occasionally
[ https://issues.apache.org/jira/browse/FLINK-8022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266707#comment-16266707 ] ASF GitHub Bot commented on FLINK-8022: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5078 Thanks! > Kafka at-least-once tests fail occasionally > --- > > Key: FLINK-8022 > URL: https://issues.apache.org/jira/browse/FLINK-8022 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > Fix For: 1.5.0 > > > {{Kafka09ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}} > seems to sporadically fail with missing data, like this execution: > {code} > > Test > testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase) > is running. > > 17:54:30,195 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - > Creating topic oneToOneTopicRegularSink > 17:54:30,196 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - In > getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436 > 17:54:30,204 INFO org.I0Itec.zkclient.ZkEventThread >- Starting ZkClient event thread. > 17:54:30,204 INFO org.I0Itec.zkclient.ZkClient >- Waiting for keeper state SyncConnected > 17:54:30,240 INFO org.I0Itec.zkclient.ZkClient >- zookeeper state changed (SyncConnected) > 17:54:30,261 INFO org.I0Itec.zkclient.ZkEventThread >- Terminate ZkClient event thread. > 17:54:30,265 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - Topic > oneToOneTopicRegularSink create request is successfully posted > 17:54:30,366 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - > Validating if the topic oneToOneTopicRegularSink has been created or not > 17:54:30,373 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - In > getZKUtils:: zookeeperConnectionString = 127.0.0.1:39436 > 17:54:30,374 INFO org.I0Itec.zkclient.ZkEventThread >- Starting ZkClient event thread. > 17:54:30,374 INFO org.I0Itec.zkclient.ZkClient >- Waiting for keeper state SyncConnected > 17:54:30,404 INFO org.I0Itec.zkclient.ZkClient >- zookeeper state changed (SyncConnected) > 17:54:30,420 INFO > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl - topic > oneToOneTopicRegularSink has been created successfully > 17:54:30,421 INFO org.I0Itec.zkclient.ZkEventThread >- Terminate ZkClient event thread. > 17:54:31,099 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase - > Starting FlinkKafkaProducer (1/1) to produce into default topic > oneToOneTopicRegularSink > 17:55:05,229 ERROR > org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase - > > Test > testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.Kafka09ProducerITCase) > failed with: > java.lang.AssertionError: Expected to contain all of: <[0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, > 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, > 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, > 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, > 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, > 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, > 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, > 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, > 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, > 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, > 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, > 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, > 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, > 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247,
[jira] [Commented] (FLINK-8122) Name all table sinks and sources
[ https://issues.apache.org/jira/browse/FLINK-8122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266731#comment-16266731 ] ASF GitHub Bot commented on FLINK-8122: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5068#discussion_r153182297 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -60,4 +60,14 @@ trait TableSource[T] { */ def explainSource(): String = "" + /** +* Gets the name which is used by the visualization and logging during runtime. +* +* @return Name of the [[TableSource]]. +*/ + def getRuntimeName(): String = { --- End diff -- Shall I reserve the existing `explainSource()` implementations or use a unified one? > Name all table sinks and sources > > > Key: FLINK-8122 > URL: https://issues.apache.org/jira/browse/FLINK-8122 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Not all table sink and sources have proper names. Therefore, they are > displayed as "Unnamed" in the logs and Web UI (e.g. CsvTableSink). We should > add names for all built-in connectors. Having information about the table > sink name (via {{INSERT INTO}}) would be even better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5080: [FLINK-8159] [cep] Add rich support for SelectWrap...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5080 [FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper ## What is the purpose of the change *This pull request add the rich support for SelectWrapper and FlatSelectWrapper. It the wrapped functions are rich function, it should process correctly.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): ( no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink SelectFunction Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5080.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5080 commit 4c3ccb008b38d44189578975b5eee9208561567b Author: Dian FuDate: 2017-11-27T12:50:30Z [FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper ---
[GitHub] flink pull request #5077: [docs] fix wrong package name
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5077 ---
[jira] [Commented] (FLINK-8143) Port SubtaskMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-8143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266771#comment-16266771 ] ASF GitHub Bot commented on FLINK-8143: --- Github user GJL closed the pull request at: https://github.com/apache/flink/pull/5079 > Port SubtaskMetricsHandler to new REST endpoint > --- > > Key: FLINK-8143 > URL: https://issues.apache.org/jira/browse/FLINK-8143 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > > Migrate logic in > {{org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler}} > to a handler that extends > {{org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler}}. > Register new handler to {{DispatcherRestEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8143) Port SubtaskMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-8143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266772#comment-16266772 ] ASF GitHub Bot commented on FLINK-8143: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5082 [FLINK-8143][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint ## What is the purpose of the change *FLIP-6 efforts: Migrating HTTP handlers* ## Brief change log - Migrate logic from `org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to` new handler. - Add new handler to `DispatcherRestEndpoint`. ## Verifying this change - *Added unit tests for all new classes and modified existing classes except for DispatcherRestEndpoint.* - *Manually deployed a job locally and verified with `curl` that SubtaskMetrics can be queried in FLIP-6 standalone mode.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8143 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5082.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5082 commit d32d978e1826df6fab8f5b4a27d47b4367d54ece Author: gyaoDate: 2017-11-27T12:30:18Z [FLINK-8150][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint Migrate logic from org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to new handler. Add new handler to DispatcherRestEndpoint. commit 96c9f671d6320d5d1d014d5df27f4a3a07417993 Author: gyao Date: 2017-11-27T12:32:52Z [hotfix][Javadoc] Remove wrong Javadoc from SubtaskMetricsHandler commit c369d733fecc71a7581b5a6cbf196b35ecdfed12 Author: gyao Date: 2017-11-27T12:34:02Z [hotfix][flip6] Add unit tests JobVertexMetricsHeadersTest > Port SubtaskMetricsHandler to new REST endpoint > --- > > Key: FLINK-8143 > URL: https://issues.apache.org/jira/browse/FLINK-8143 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > > Migrate logic in > {{org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler}} > to a handler that extends > {{org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler}}. > Register new handler to {{DispatcherRestEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5068: [FLINK-8122] [table] Name all built-in table sinks...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5068#discussion_r153182297 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -60,4 +60,14 @@ trait TableSource[T] { */ def explainSource(): String = "" + /** +* Gets the name which is used by the visualization and logging during runtime. +* +* @return Name of the [[TableSource]]. +*/ + def getRuntimeName(): String = { --- End diff -- Shall I reserve the existing `explainSource()` implementations or use a unified one? ---
[GitHub] flink issue #5079: [FLINK-8143][flip6] Migrate SubtaskMetricsHandler to new ...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5079 My branch name is wrong. The commit messages are right. Let me know if this is a problem. ---
[jira] [Commented] (FLINK-8143) Port SubtaskMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-8143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266768#comment-16266768 ] ASF GitHub Bot commented on FLINK-8143: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5079 My branch name is wrong. The commit messages are right. Let me know if this is a problem. > Port SubtaskMetricsHandler to new REST endpoint > --- > > Key: FLINK-8143 > URL: https://issues.apache.org/jira/browse/FLINK-8143 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > > Migrate logic in > {{org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler}} > to a handler that extends > {{org.apache.flink.runtime.rest.handler.job.metrics.AbstractMetricsHandler}}. > Register new handler to {{DispatcherRestEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5084: [FLINK-7694][flip6] Migrate JobMetricsHandler to n...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5084 [FLINK-7694][flip6] Migrate JobMetricsHandler to new RestServerEndpoint ## What is the purpose of the change *FLIP-6 efforts: Migrating HTTP handlers* ## Brief change log - Migrate logic from `org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler to` new handler. - Add new handler to `DispatcherRestEndpoint`. ## Verifying this change - *Added unit tests for all new classes and modified existing classes except for DispatcherRestEndpoint.* - *Manually deployed a job locally and verified with `curl` that Job metrics can be queried in FLIP-6 standalone mode.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7694 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5084.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5084 commit 7a3bdc5a8c2c53a9b4219d1fab7faf5490d1ef92 Author: gyaoDate: 2017-11-27T13:34:03Z [FLINK-7694][flip6] Migrate JobMetricsHandler to new RestServerEndpoint Migrate logic in org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetrisHandler to new handler, and add new handler to DispatcherRestEndpoint. ---
[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266819#comment-16266819 ] ASF GitHub Bot commented on FLINK-7694: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5084 [FLINK-7694][flip6] Migrate JobMetricsHandler to new RestServerEndpoint ## What is the purpose of the change *FLIP-6 efforts: Migrating HTTP handlers* ## Brief change log - Migrate logic from `org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler to` new handler. - Add new handler to `DispatcherRestEndpoint`. ## Verifying this change - *Added unit tests for all new classes and modified existing classes except for DispatcherRestEndpoint.* - *Manually deployed a job locally and verified with `curl` that Job metrics can be queried in FLIP-6 standalone mode.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7694 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5084.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5084 commit 7a3bdc5a8c2c53a9b4219d1fab7faf5490d1ef92 Author: gyaoDate: 2017-11-27T13:34:03Z [FLINK-7694][flip6] Migrate JobMetricsHandler to new RestServerEndpoint Migrate logic in org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetrisHandler to new handler, and add new handler to DispatcherRestEndpoint. > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Bowen Li >Assignee: Gary Yao > Labels: flip6 > Fix For: 1.5.0 > > > Port > {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}} to > new handler that works with {{RestServerEndpoint}}. Add new handler to > {{DispatcherRestEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5078: [FLINK-8022][kafka] Bump at-least-once timeout in ...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5078 [FLINK-8022][kafka] Bump at-least-once timeout in tests Increasing timeout for reading the records from 30s to 60s seems to solve the issue for failing at-least-one tests. This is a minor fix in tests that intends to increase tests stability. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f8022 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5078.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5078 commit 31a0f9ecb6f39210e2b7dacee9bcf90f44ceee1f Author: Piotr NowojskiDate: 2017-11-23T11:53:37Z [FLINK-8022][kafka] Bump at-least-once timeout in tests Increaseing timeout for reading the records from 30s to 60s seems to solve the issue for failing at-least-one tests. ---
[GitHub] flink issue #5078: [FLINK-8022][kafka] Bump at-least-once timeout in tests
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5078 Thanks! ---
[jira] [Commented] (FLINK-8159) Add rich support for SelectWrapper and FlatSelectWrapper
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266750#comment-16266750 ] ASF GitHub Bot commented on FLINK-8159: --- GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5080 [FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper ## What is the purpose of the change *This pull request add the rich support for SelectWrapper and FlatSelectWrapper. It the wrapped functions are rich function, it should process correctly.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): ( no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink SelectFunction Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5080.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5080 commit 4c3ccb008b38d44189578975b5eee9208561567b Author: Dian FuDate: 2017-11-27T12:50:30Z [FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper > Add rich support for SelectWrapper and FlatSelectWrapper > > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8160) Extend OperatorHarness to expose metrics
[ https://issues.apache.org/jira/browse/FLINK-8160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-8160: --- Assignee: (was: Chesnay Schepler) > Extend OperatorHarness to expose metrics > > > Key: FLINK-8160 > URL: https://issues.apache.org/jira/browse/FLINK-8160 > Project: Flink > Issue Type: Improvement > Components: Metrics, Streaming >Reporter: Chesnay Schepler > Fix For: 1.5.0 > > > To better test interactions between operators and metrics the harness should > expose the metrics registered by the operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5083: [FLINK-7716][flip6] Migrate JobManagerMetricsHandl...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5083 [FLINK-7716][flip6] Migrate JobManagerMetricsHandler to new RestServerEndpoint ## What is the purpose of the change *FLIP-6 efforts: Migrating HTTP handlers* ## Brief change log - Migrate logic from `org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler to` new handler. - Add new handler to `DispatcherRestEndpoint`. ## Verifying this change - *Added unit tests for all new classes and modified existing classes except for DispatcherRestEndpoint.* - *Manually deployed a job locally and verified with `curl` that JobManager metrics can be queried in FLIP-6 standalone mode.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7716 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5083.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5083 commit 17244d63fcc1610c83a5c020a63fe045087a2f07 Author: gyaoDate: 2017-11-27T13:18:48Z [FLIP-7716][flip6] Migrate JobManagerMetricsHandler to new RestServerEndpoint Migrate logic in org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler to new handler and add new handler to DispatcherRestEndpoint. commit 51d37ccce04b912dcda6f5ae74d8092d22b5ff9d Author: gyao Date: 2017-11-27T13:22:17Z [FLINK-7716][Javadoc] Deprecate method MetricStore#getJobManager(). There is a semantically equivalent method in MetricStore. commit c7283194b19719270e4c0c2cc185876e710b750a Author: gyao Date: 2017-11-27T13:24:42Z [hotfix][Javadoc] Fix typo in ConversionException ---
[GitHub] flink issue #4581: [FLINK-7499][io] fix double buffer release in SpillableSu...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4581 Still failing with some checkstyle violations in `ResultPartition.java` ---
[jira] [Commented] (FLINK-8154) JobSubmissionClientActor submited job,but there is no connection to a JobManager
[ https://issues.apache.org/jira/browse/FLINK-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266949#comment-16266949 ] Gregory Melekh commented on FLINK-8154: --- Ohhh...You are right... I should create headless service to redirect messages to Jobmanager Pod. Thanks > JobSubmissionClientActor submited job,but there is no connection to a > JobManager > - > > Key: FLINK-8154 > URL: https://issues.apache.org/jira/browse/FLINK-8154 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.2 > Environment: Kubernetes 1.8.3, Platform "Linux/amd64" >Reporter: Gregory Melekh >Priority: Blocker > > There is JobManager log file bellow. > 2017-11-26 08:17:13,435 INFO org.apache.flink.client.CliFrontend > - > > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - Starting Command Line Client (Version: 1.3.2, Rev:0399bee, > Date:03.08.2017 @ 10:23:11 UTC) > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - Current user: root > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - > 1.8/25.131-b11 > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - Maximum heap size: 6252 MiBytes > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre > 2017-11-26 08:17:13,439 INFO org.apache.flink.client.CliFrontend > - Hadoop version: 2.7.2 > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - JVM Options: > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - > -Dlog.file=/opt/flink/log/flink--client-act-monitor-flink-jobmanager-66cd4bdb5c-8kxbh.log > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - -Dlog4j.configuration=file:/etc/flink/log4j-cli.properties > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - -Dlogback.configurationFile=file:/etc/flink/logback.xml > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - Program Arguments: > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - run > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - -c > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - monitoring.flow.AccumulateAll > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - /tmp/monitoring-0.0.1-SNAPSHOT.jar > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - Classpath: > /opt/flink/lib/flink-python_2.11-1.3.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.3.2.jar::: > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - > > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - Using configuration directory /etc/flink > 2017-11-26 08:17:13,441 INFO org.apache.flink.client.CliFrontend > - Trying to load configuration file > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: blob.server.port, 6124 > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: jobmanager.rpc.address, act-monitor-flink-jobmanager > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: jobmanager.rpc.port, 6123 > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: jobmanager.heap.mb, 1024 > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: jobmanager.web.log.path, > /etc/flink/log/act-monitor-flink-jobmanager.log > 2017-11-26 08:17:13,444 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration
[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153241166 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle; +import org.apache.flink.runtime.checkpoint.CheckpointCache; +import org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * {@link CachedCheckpointStreamFactory} is used to build an output stream that writes data to both remote end (e.g:DFS) and local end. + * Local data is managed by {@link CheckpointCache}. It simply wraps {@link CheckpointCache} and {@link CheckpointStreamFactory} and + * create a hybrid output stream by {@link CheckpointCache} and {@link CheckpointStreamFactory}, this hybrid output stream will write + * to both remote end and local end. + */ +public class CachedCheckpointStreamFactory implements CheckpointStreamFactory { + + private static Logger LOG = LoggerFactory.getLogger(CachedCheckpointStreamFactory.class); + + private final CheckpointCache cache; + private final CheckpointStreamFactory remoteFactory; + + public CachedCheckpointStreamFactory(CheckpointCache cache, CheckpointStreamFactory factory) { + this.cache = cache; + this.remoteFactory = Preconditions.checkNotNull(factory, "Remote stream factory is null."); + } + + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp, StateHandleID handleID) throws Exception { + return createCheckpointStateOutputStream(checkpointID, timestamp, handleID, false); + } + + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp, StateHandleID handleID, boolean placeholder) throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("create cache output stream: cpkID:{} placeHolder:{}", checkpointID, placeholder); + } + CachedOutputStream cachedOut = null; + if (cache != null) { + cachedOut = cache.createOutputStream(checkpointID, handleID, placeholder); + } + CheckpointStateOutputStream remoteOut = null; + if (!placeholder) { + remoteOut = remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp); + } + CachedCheckpointStateOutputStream output = new CachedCheckpointStateOutputStream(cachedOut, remoteOut); + return output; + } + + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + LOG.warn("create output stream which is not cacheable."); + return remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp); + } + + @Override + public void close() throws Exception { + remoteFactory.close(); + } + + /** +* A hybrid checkpoint output stream which write data to both remote end and local end, +* writing data locally failed won't stop writing to remote. This hybrid output stream +* will return a {@link CachedStreamStateHandle} in closeAndGetHandle(), it can be used for read data locally. +*/ + public static class CachedCheckpointStateOutputStream extends CheckpointStateOutputStream { + + private CachedOutputStream cacheOut = null; + private CheckpointStateOutputStream remoteOut = null; + +
[jira] [Commented] (FLINK-8161) Flakey YARNSessionCapacitySchedulerITCase on Travis
[ https://issues.apache.org/jira/browse/FLINK-8161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267023#comment-16267023 ] ASF GitHub Bot commented on FLINK-8161: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5085 [FLINK-8161] [tests] Harden YARNSessionCapacitySchedulerITCase ## What is the purpose of the change Add "Remote connection to [null] failed with java.nio.channels.NotYetConnectedException" to the list of whitelisted log statements in YarnTestBase. This logging statement seems to appear since we moved from Flakka to Akka 2.4.0. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixYarnSessionCapacitySchedulerITCase Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5085.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5085 commit eede21bc1ee335a913d1a49d82d73ce1d0cfe3aa Author: Till RohrmannDate: 2017-11-27T16:31:28Z [FLINK-8161] [tests] Harden YARNSessionCapacitySchedulerITCase Add "Remote connection to [null] failed with java.nio.channels.NotYetConnectedException" to the list of whitelisted log statements in YarnTestBase. This logging statement seems to appear since we moved from Flakka to Akka 2.4.0. > Flakey YARNSessionCapacitySchedulerITCase on Travis > --- > > Key: FLINK-8161 > URL: https://issues.apache.org/jira/browse/FLINK-8161 > Project: Flink > Issue Type: Bug > Components: Tests, YARN >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.5.0 > > > The {{YARNSessionCapacitySchedulerITCase}} spuriously fails on Travis because > it now contains {{2017-11-25 22:49:49,204 WARN > akka.remote.transport.netty.NettyTransport- Remote > connection to [null] failed with java.nio.channels.NotYetConnectedException}} > from time to time in the logs. I suspect that this is due to switching from > Flakka to Akka 2.4.0. In order to solve this problem I propose to add this > log statement to the whitelisted log statements. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5085: [FLINK-8161] [tests] Harden YARNSessionCapacitySch...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5085 [FLINK-8161] [tests] Harden YARNSessionCapacitySchedulerITCase ## What is the purpose of the change Add "Remote connection to [null] failed with java.nio.channels.NotYetConnectedException" to the list of whitelisted log statements in YarnTestBase. This logging statement seems to appear since we moved from Flakka to Akka 2.4.0. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixYarnSessionCapacitySchedulerITCase Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5085.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5085 commit eede21bc1ee335a913d1a49d82d73ce1d0cfe3aa Author: Till RohrmannDate: 2017-11-27T16:31:28Z [FLINK-8161] [tests] Harden YARNSessionCapacitySchedulerITCase Add "Remote connection to [null] failed with java.nio.channels.NotYetConnectedException" to the list of whitelisted log statements in YarnTestBase. This logging statement seems to appear since we moved from Flakka to Akka 2.4.0. ---
[GitHub] flink pull request #5089: [FLINK-8088] Associate logical slots with the slot...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5089 [FLINK-8088] Associate logical slots with the slot request id ## What is the purpose of the change Before, logical slots like the `SimpleSlot` and `SharedSlot` where associated to the actually allocated slot via the `AllocationID`. This, however, was sub-optimal because allocated slots can be re-used to fulfill also other slot requests (logical slots). Therefore, we should bind the logical slots to the right id with the right lifecycle which is the `SlotRequestID`. This PR is based on #5088. ## Brief change log - Introduce `SlotRequestID` - Associate logical slot requests with `SlotRequestID` which is valid over the lifetime of a logical slot ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) CC: @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink slotAssociation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5089.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5089 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f Author: Till Rohrmann Date: 2017-11-24T17:03:49Z [FLINK-8087] Decouple Slot from AllocatedSlot This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the TaskManager without relying on the AllocatedSlot which is now only used by the SlotPool. commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942 Author: Till Rohrmann Date: 2017-11-24T17:06:10Z [FLINK-8088] Associate logical slots with the slot request id Before logical slots like the SimpleSlot and SharedSlot where associated to the actually allocated slot via the AllocationID. This, however, was sub-optimal because allocated slots can be re-used to fulfill also other slot requests (logical slots). Therefore, we should bind the logical slots to the right id with the right lifecycle which is the slot request id. ---
[jira] [Commented] (FLINK-8088) Bind logical slots to their request id instead of the slot allocation id
[ https://issues.apache.org/jira/browse/FLINK-8088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267094#comment-16267094 ] ASF GitHub Bot commented on FLINK-8088: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5089 [FLINK-8088] Associate logical slots with the slot request id ## What is the purpose of the change Before, logical slots like the `SimpleSlot` and `SharedSlot` where associated to the actually allocated slot via the `AllocationID`. This, however, was sub-optimal because allocated slots can be re-used to fulfill also other slot requests (logical slots). Therefore, we should bind the logical slots to the right id with the right lifecycle which is the `SlotRequestID`. This PR is based on #5088. ## Brief change log - Introduce `SlotRequestID` - Associate logical slot requests with `SlotRequestID` which is valid over the lifetime of a logical slot ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) CC: @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink slotAssociation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5089.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5089 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f Author: Till Rohrmann Date: 2017-11-24T17:03:49Z [FLINK-8087] Decouple Slot from AllocatedSlot This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the TaskManager without relying on the AllocatedSlot which is now only used by the SlotPool. commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942 Author: Till Rohrmann Date: 2017-11-24T17:06:10Z [FLINK-8088] Associate logical slots with the slot request id Before logical slots like the SimpleSlot and SharedSlot where associated to the actually allocated slot via the AllocationID. This, however, was sub-optimal because allocated slots can be re-used to fulfill also other slot requests (logical slots). Therefore, we should bind the logical slots to the right id with the right lifecycle which is the slot request id. > Bind logical slots to their request id instead of the slot allocation id > > > Key: FLINK-8088 > URL: https://issues.apache.org/jira/browse/FLINK-8088 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > Since allocated slots can be reused to fulfil multiple slot requests, we > should bind the resulting logical slots to their slot request id instead of > the allocation id of the underlying allocated slot. -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266992#comment-16266992 ] ASF GitHub Bot commented on FLINK-7873: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153241166 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle; +import org.apache.flink.runtime.checkpoint.CheckpointCache; +import org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * {@link CachedCheckpointStreamFactory} is used to build an output stream that writes data to both remote end (e.g:DFS) and local end. + * Local data is managed by {@link CheckpointCache}. It simply wraps {@link CheckpointCache} and {@link CheckpointStreamFactory} and + * create a hybrid output stream by {@link CheckpointCache} and {@link CheckpointStreamFactory}, this hybrid output stream will write + * to both remote end and local end. + */ +public class CachedCheckpointStreamFactory implements CheckpointStreamFactory { + + private static Logger LOG = LoggerFactory.getLogger(CachedCheckpointStreamFactory.class); + + private final CheckpointCache cache; + private final CheckpointStreamFactory remoteFactory; + + public CachedCheckpointStreamFactory(CheckpointCache cache, CheckpointStreamFactory factory) { + this.cache = cache; + this.remoteFactory = Preconditions.checkNotNull(factory, "Remote stream factory is null."); + } + + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp, StateHandleID handleID) throws Exception { + return createCheckpointStateOutputStream(checkpointID, timestamp, handleID, false); + } + + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp, StateHandleID handleID, boolean placeholder) throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("create cache output stream: cpkID:{} placeHolder:{}", checkpointID, placeholder); + } + CachedOutputStream cachedOut = null; + if (cache != null) { + cachedOut = cache.createOutputStream(checkpointID, handleID, placeholder); + } + CheckpointStateOutputStream remoteOut = null; + if (!placeholder) { + remoteOut = remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp); + } + CachedCheckpointStateOutputStream output = new CachedCheckpointStateOutputStream(cachedOut, remoteOut); + return output; + } + + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + LOG.warn("create output stream which is not cacheable."); + return remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp); + } + + @Override + public void close() throws Exception { + remoteFactory.close(); + } + + /** +* A hybrid checkpoint output stream which write data to both remote end and local end, +* writing data locally failed won't stop writing to remote. This hybrid output stream +* will return a {@link CachedStreamStateHandle} in closeAndGetHandle(), it can be used for read data locally. +*/ +
[GitHub] flink pull request #5086: [FLINK-8078] Introduce LogicalSlot interface
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5086 [FLINK-8078] Introduce LogicalSlot interface ## What is the purpose of the change The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is `SimpleSlot`. This is a helpful step to introduce a different slot implementation for Flip-6. ## Brief change log - Introduce `LogicalSlot` - Replace `SimpleSlot` usage by `LogicalSlot` - Let `SimpleSlot` implement the `LogicalSlot` interface ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink generalizeSlot Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5086.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5086 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. ---
[jira] [Commented] (FLINK-8078) Decouple Execution from actual slot implementation
[ https://issues.apache.org/jira/browse/FLINK-8078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267070#comment-16267070 ] ASF GitHub Bot commented on FLINK-8078: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5086 [FLINK-8078] Introduce LogicalSlot interface ## What is the purpose of the change The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is `SimpleSlot`. This is a helpful step to introduce a different slot implementation for Flip-6. ## Brief change log - Introduce `LogicalSlot` - Replace `SimpleSlot` usage by `LogicalSlot` - Let `SimpleSlot` implement the `LogicalSlot` interface ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink generalizeSlot Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5086.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5086 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. > Decouple Execution from actual slot implementation > -- > > Key: FLINK-8078 > URL: https://issues.apache.org/jira/browse/FLINK-8078 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to plug in a different slot implementation, we should introduce a > slot interface which abstracts away the implementation details of > {{SimpleSlot}} wrt {{Execution}}. The reason this is necessary is to provide > a simpler slot implementation for Flip-6 since all allocation/release logic > will go through the {{SlotPool}}. Thus, we no longer need the concurrent > structure of {{Slot}}, {{SharedSlot}}, {{SimpleSlot}} and > {{SlotSharingGroupAssignment}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267112#comment-16267112 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r153261175 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -152,6 +170,26 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof RemoteInputChannel) { + boolean triggerWrite = inputChannelsWithCredit.isEmpty(); --- End diff -- how about some small comment as in `PartitionRequestQueue`? Something like ``` // Queue an input channel for available credits announcement. // If the queue is empty, we try to trigger the actual write. // Otherwise this will be handled by the // writeAndFlushNextMessageIfPossible calls. ``` > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267111#comment-16267111 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r153266186 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -539,4 +542,60 @@ static CloseRequest readFrom(@SuppressWarnings("unused") ByteBuf buffer) throws return new CloseRequest(); } } + + static class AddCredit extends NettyMessage { --- End diff -- Please add a comment ("incremental credit announcement from the client to the server"?). > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267113#comment-16267113 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r153269318 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -283,10 +283,13 @@ public String toString() { // /** -* Enqueue this input channel in the pipeline for sending unannounced credits to producer. +* Enqueue this input channel in the pipeline for notifying the producer of unannounced credit. */ void notifyCreditAvailable() { - //TODO in next PR + // We should skip the notification if this channel is already released. + if (!isReleased.get() && partitionRequestClient != null) { --- End diff -- shouldn't we ``` checkState(partitionRequestClient != null, "Tried to send credit announcement to producer before requesting a queue.");` ``` here as well? At the moment I don't see a valid usecase for `== null` and only a potential problem with the notification not being tried again. > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267109#comment-16267109 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r153258008 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -88,6 +98,15 @@ void cancelRequestFor(InputChannelID inputChannelId) { } } + void notifyCreditAvailable(final RemoteInputChannel inputChannel) { --- End diff -- Can you please add a comment under which circumstances not to call this, i.e. we must make sure, `ctx` is assigned yet (so after the channel has been activated somehow). I checked the uses of this method and those seem to be safe, i.e. in `RemoteInputChannel`s `#notifyBufferAvailable()`, `#onSenderBacklog()`, and `#recycle()`. All of these should only happen after some interaction with the channel. > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267110#comment-16267110 ] ASF GitHub Bot commented on FLINK-7416: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r153263139 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -274,4 +313,49 @@ private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.B bufferOrEvent.releaseBuffer(); } } + + private void writeAndFlushNextMessageIfPossible(Channel channel) { --- End diff -- Please add some javadoc with a hint how all `inputChannelsWithCredit` will be handled, i.e. one is written immediately, following ones after successful writes. > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8154) JobSubmissionClientActor submited job,but there is no connection to a JobManager
[ https://issues.apache.org/jira/browse/FLINK-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gregory Melekh closed FLINK-8154. - Resolution: Not A Problem > JobSubmissionClientActor submited job,but there is no connection to a > JobManager > - > > Key: FLINK-8154 > URL: https://issues.apache.org/jira/browse/FLINK-8154 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.2 > Environment: Kubernetes 1.8.3, Platform "Linux/amd64" >Reporter: Gregory Melekh >Priority: Blocker > > There is JobManager log file bellow. > 2017-11-26 08:17:13,435 INFO org.apache.flink.client.CliFrontend > - > > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - Starting Command Line Client (Version: 1.3.2, Rev:0399bee, > Date:03.08.2017 @ 10:23:11 UTC) > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - Current user: root > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - > 1.8/25.131-b11 > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - Maximum heap size: 6252 MiBytes > 2017-11-26 08:17:13,437 INFO org.apache.flink.client.CliFrontend > - JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre > 2017-11-26 08:17:13,439 INFO org.apache.flink.client.CliFrontend > - Hadoop version: 2.7.2 > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - JVM Options: > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - > -Dlog.file=/opt/flink/log/flink--client-act-monitor-flink-jobmanager-66cd4bdb5c-8kxbh.log > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - -Dlog4j.configuration=file:/etc/flink/log4j-cli.properties > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - -Dlogback.configurationFile=file:/etc/flink/logback.xml > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - Program Arguments: > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - run > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - -c > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - monitoring.flow.AccumulateAll > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - /tmp/monitoring-0.0.1-SNAPSHOT.jar > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - Classpath: > /opt/flink/lib/flink-python_2.11-1.3.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.3.2.jar::: > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - > > 2017-11-26 08:17:13,440 INFO org.apache.flink.client.CliFrontend > - Using configuration directory /etc/flink > 2017-11-26 08:17:13,441 INFO org.apache.flink.client.CliFrontend > - Trying to load configuration file > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: blob.server.port, 6124 > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: jobmanager.rpc.address, act-monitor-flink-jobmanager > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: jobmanager.rpc.port, 6123 > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: jobmanager.heap.mb, 1024 > 2017-11-26 08:17:13,443 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: jobmanager.web.log.path, > /etc/flink/log/act-monitor-flink-jobmanager.log > 2017-11-26 08:17:13,444 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: taskmanager.heap.mb, 1024 > 2017-11-26 08:17:13,444 INFO > org.apache.flink.configuration.GlobalConfiguration
[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153234962 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java --- @@ -190,6 +199,11 @@ public static TaskManagerServices fromConfiguration( final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); + final CheckpointCacheManager checkpointCacheManager = new CheckpointCacheManager( + new ScheduledThreadPoolExecutor(1), + Executors.directExecutor(), + taskManagerServicesConfiguration.getTmpDirPaths()[0]); --- End diff -- I find this problematic, because it does not consider all the configured tmp directories. While most user probably have only a single tmp directory configured, this can be problematic if somebody makes use of multiple directories (e.g. to utilize multiple smaller disks). We should also be sensitive about this case. ---
[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266965#comment-16266965 ] ASF GitHub Bot commented on FLINK-7873: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153234962 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java --- @@ -190,6 +199,11 @@ public static TaskManagerServices fromConfiguration( final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); + final CheckpointCacheManager checkpointCacheManager = new CheckpointCacheManager( + new ScheduledThreadPoolExecutor(1), + Executors.directExecutor(), + taskManagerServicesConfiguration.getTmpDirPaths()[0]); --- End diff -- I find this problematic, because it does not consider all the configured tmp directories. While most user probably have only a single tmp directory configured, this can be problematic if somebody makes use of multiple directories (e.g. to utilize multiple smaller disks). We should also be sensitive about this case. > Introduce CheckpointCacheManager for reading checkpoint data locally when > performing failover > - > > Key: FLINK-7873 > URL: https://issues.apache.org/jira/browse/FLINK-7873 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou > > Why i introduce this: > Current recover strategy will always read checkpoint data from remote > FileStream (HDFS). This will cost a lot of bandwidth when the state is so big > (e.g. 1T). What's worse, if this job performs recover again and again, it can > eat up all network bandwidth and do a huge hurt to cluster. So, I proposed > that we can cache the checkpoint data locally, and read checkpoint data from > local cache as well as we can, we read the data from remote only if we fail > locally. The advantage is that if a execution is assigned to the same > TaskManager as before, it can save a lot of bandwith, and obtain a faster > recover. > Solution: > TaskManager do the cache job and manage the cached data itself. It simple > use a TTL-like method to manage cache entry's dispose, we dispose a entry if > it wasn't be touched for a X time, once we touch a entry we reset the TTL for > it. In this way, all jobs is done by TaskManager, it transparent to > JobManager. The only problem is that we may dispose a entry that maybe > useful, in this case, we have to read from remote data finally, but users can > avoid this by set a proper TTL value according to checkpoint interval and > other things. > Can someone give me some advice? I would appreciate it very much~ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153235421 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java --- @@ -510,6 +512,13 @@ private static void serializeStreamStateHandle( byte[] internalData = byteStreamStateHandle.getData(); dos.writeInt(internalData.length); dos.write(byteStreamStateHandle.getData()); + } else if (stateHandle instanceof CachedStreamStateHandle) { --- End diff -- This means we are actually introducing significant new code to the job manager, that even impacts the serialization format. I think this should not strictly be required if we map local state to checkpoint ids. ---
[jira] [Commented] (FLINK-8087) Decouple Slot from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-8087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267088#comment-16267088 ] ASF GitHub Bot commented on FLINK-8087: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5088 [FLINK-8087] Decouple Slot from AllocatedSlot ## What is the purpose of the change This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the TaskManager without relying on the AllocatedSlot which is now only used by the SlotPool. This PR is based on #5087. ## Brief change log - Introduce `SlotContext` as simple abstraction for slot related information - Remove dependency of `Slot` on `AllocatedSlot` which is now only used internally by the `SlotPool`. - Introduce `SimpleSlotContext` which implements `SlotContext` and acts as the slot context for the `SimpleSlot`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) CC: @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink decoupleSlotFromSlotPool Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5088.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5088 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f Author: Till Rohrmann Date: 2017-11-24T17:03:49Z [FLINK-8087] Decouple Slot from AllocatedSlot This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the TaskManager without relying on the AllocatedSlot which is now only used by the SlotPool. > Decouple Slot from SlotPool > --- > > Key: FLINK-8087 > URL: https://issues.apache.org/jira/browse/FLINK-8087 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to let the {{SlotPool}} return a a different {{LogicalSlot}} > implementation than {{SimpleSlot}} we should not store the {{Slot}} inside of > the {{SlotPool}}. Moreover, we should introduce a abstraction for the > {{AllocatedSlot}} which contains the information required by the > {{SimpleSlot}}. That way we decouple the {{SimpleSlot}} from the > {{AllocatedSlot}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5088: [FLINK-8087] Decouple Slot from AllocatedSlot
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5088 [FLINK-8087] Decouple Slot from AllocatedSlot ## What is the purpose of the change This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the TaskManager without relying on the AllocatedSlot which is now only used by the SlotPool. This PR is based on #5087. ## Brief change log - Introduce `SlotContext` as simple abstraction for slot related information - Remove dependency of `Slot` on `AllocatedSlot` which is now only used internally by the `SlotPool`. - Introduce `SimpleSlotContext` which implements `SlotContext` and acts as the slot context for the `SimpleSlot`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) CC: @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink decoupleSlotFromSlotPool Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5088.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5088 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f Author: Till Rohrmann Date: 2017-11-24T17:03:49Z [FLINK-8087] Decouple Slot from AllocatedSlot This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the TaskManager without relying on the AllocatedSlot which is now only used by the SlotPool. ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266940#comment-16266940 ] ASF GitHub Bot commented on FLINK-7499: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4581 Still failing with some checkstyle violations in `ResultPartition.java` > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266966#comment-16266966 ] ASF GitHub Bot commented on FLINK-7873: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5074#discussion_r153235421 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java --- @@ -510,6 +512,13 @@ private static void serializeStreamStateHandle( byte[] internalData = byteStreamStateHandle.getData(); dos.writeInt(internalData.length); dos.write(byteStreamStateHandle.getData()); + } else if (stateHandle instanceof CachedStreamStateHandle) { --- End diff -- This means we are actually introducing significant new code to the job manager, that even impacts the serialization format. I think this should not strictly be required if we map local state to checkpoint ids. > Introduce CheckpointCacheManager for reading checkpoint data locally when > performing failover > - > > Key: FLINK-7873 > URL: https://issues.apache.org/jira/browse/FLINK-7873 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou > > Why i introduce this: > Current recover strategy will always read checkpoint data from remote > FileStream (HDFS). This will cost a lot of bandwidth when the state is so big > (e.g. 1T). What's worse, if this job performs recover again and again, it can > eat up all network bandwidth and do a huge hurt to cluster. So, I proposed > that we can cache the checkpoint data locally, and read checkpoint data from > local cache as well as we can, we read the data from remote only if we fail > locally. The advantage is that if a execution is assigned to the same > TaskManager as before, it can save a lot of bandwith, and obtain a faster > recover. > Solution: > TaskManager do the cache job and manage the cached data itself. It simple > use a TTL-like method to manage cache entry's dispose, we dispose a entry if > it wasn't be touched for a X time, once we touch a entry we reset the TTL for > it. In this way, all jobs is done by TaskManager, it transparent to > JobManager. The only problem is that we may dispose a entry that maybe > useful, in this case, we have to read from remote data finally, but users can > avoid this by set a proper TTL value according to checkpoint interval and > other things. > Can someone give me some advice? I would appreciate it very much~ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8161) Flakey YARNSessionCapacitySchedulerITCase on Travis
Till Rohrmann created FLINK-8161: Summary: Flakey YARNSessionCapacitySchedulerITCase on Travis Key: FLINK-8161 URL: https://issues.apache.org/jira/browse/FLINK-8161 Project: Flink Issue Type: Bug Components: Tests, YARN Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Critical Fix For: 1.5.0 The {{YARNSessionCapacitySchedulerITCase}} spuriously fails on Travis because it now contains {{2017-11-25 22:49:49,204 WARN akka.remote.transport.netty.NettyTransport- Remote connection to [null] failed with java.nio.channels.NotYetConnectedException}} from time to time in the logs. I suspect that this is due to switching from Flakka to Akka 2.4.0. In order to solve this problem I propose to add this log statement to the whitelisted log statements. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r153253247 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** +* Add the exclusive buffer into the queue, and recycle one floating buffer if the +* number of available buffers in queue is more than required amount. +* +* @param buffer The exclusive buffer of this channel. +* @return Whether to recycle one floating buffer. +*/ + boolean maintainTargetSize(Buffer buffer) { --- End diff -- Accidentally found that this was not addressed in the newest commits...can you change this? ---
[jira] [Commented] (FLINK-7642) Upgrade maven surefire plugin to 2.19.1
[ https://issues.apache.org/jira/browse/FLINK-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267037#comment-16267037 ] Greg Hogan commented on FLINK-7642: --- Please stop adding and removing a blank line from the description every few days. Does 2.20.1 resolve SUREFIRE-1255? From the parent {{pom.xml}}: {noformat} 2.18.1 {noformat} > Upgrade maven surefire plugin to 2.19.1 > --- > > Key: FLINK-7642 > URL: https://issues.apache.org/jira/browse/FLINK-7642 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu > > Surefire 2.19 release introduced more useful test filters which would let us > run a subset of the test. > This issue is for upgrading maven surefire plugin to 2.19.1 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5091 [FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool ## What is the purpose of the change This commit adds support for queued scheduling with slot sharing to the SlotPool. The idea of slot sharing is that multiple tasks can run in the same slot. Moreover, queued scheduling means that a slot request must not be completed right away but at a later point in time. This allows to start new TaskExecutors in case that there are no more slots left. The main component responsible for the management of shared slots is the SlotSharingManager. The SlotSharingManager maintains internally a tree-like structure which stores the SlotContext future of the underlying AllocatedSlot. Whenever this future is completed potentially pending LogicalSlot instantiations are executed and sent to the slot requester. A shared slot is represented by a MultiTaskSlot which can harbour multiple TaskSlots. A TaskSlot can either be a MultiTaskSlot or a SingleTaskSlot. In order to represent co-location constraints, we first obtain a root MultiTaskSlot and then allocate a nested MultiTaskSlot in which the co-located tasks are allocated. The corresponding SlotRequestID is assigned to the CoLocationConstraint in order to make the TaskSlot retrievable for other tasks assigned to the same CoLocationConstraint. This PR also moves the `SlotPool` components to `o.a.f.runtime.jobmaster.slotpool`. This PR is based on #5090 ## Brief change log - Add `SlotSharingManager` to manage shared slots - Rework `SlotPool` to use `SlotSharingManager` - Add `SlotPool#allocateMultiTaskSlot` to allocate a shared slot - Add `SlotPool#allocateCoLocatedMultiTaskSlot` to allocate a co-located slot - Move `SlotPool` components to `o.a.f.runtime.jobmaster.slotpool` ## Verifying this change - Port `SchedulerSlotSharingTest`, `SchedulerIsolatedTasksTest` and `ScheduleWithCoLocationHintTest` to run with `SlotPool` - Add `SlotSharingManagerTest`, `SlotPoolSlotSharingTest` and `SlotPoolCoLocationTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) CC: @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink slotPoolSlots Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5091.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5091 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f Author: Till Rohrmann Date: 2017-11-24T17:03:49Z [FLINK-8087] Decouple Slot from AllocatedSlot This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the TaskManager without relying on the AllocatedSlot which is now only used by the SlotPool. commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942 Author: Till Rohrmann Date: 2017-11-24T17:06:10Z [FLINK-8088]
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267135#comment-16267135 ] ASF GitHub Bot commented on FLINK-7956: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5091 [FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool ## What is the purpose of the change This commit adds support for queued scheduling with slot sharing to the SlotPool. The idea of slot sharing is that multiple tasks can run in the same slot. Moreover, queued scheduling means that a slot request must not be completed right away but at a later point in time. This allows to start new TaskExecutors in case that there are no more slots left. The main component responsible for the management of shared slots is the SlotSharingManager. The SlotSharingManager maintains internally a tree-like structure which stores the SlotContext future of the underlying AllocatedSlot. Whenever this future is completed potentially pending LogicalSlot instantiations are executed and sent to the slot requester. A shared slot is represented by a MultiTaskSlot which can harbour multiple TaskSlots. A TaskSlot can either be a MultiTaskSlot or a SingleTaskSlot. In order to represent co-location constraints, we first obtain a root MultiTaskSlot and then allocate a nested MultiTaskSlot in which the co-located tasks are allocated. The corresponding SlotRequestID is assigned to the CoLocationConstraint in order to make the TaskSlot retrievable for other tasks assigned to the same CoLocationConstraint. This PR also moves the `SlotPool` components to `o.a.f.runtime.jobmaster.slotpool`. This PR is based on #5090 ## Brief change log - Add `SlotSharingManager` to manage shared slots - Rework `SlotPool` to use `SlotSharingManager` - Add `SlotPool#allocateMultiTaskSlot` to allocate a shared slot - Add `SlotPool#allocateCoLocatedMultiTaskSlot` to allocate a co-located slot - Move `SlotPool` components to `o.a.f.runtime.jobmaster.slotpool` ## Verifying this change - Port `SchedulerSlotSharingTest`, `SchedulerIsolatedTasksTest` and `ScheduleWithCoLocationHintTest` to run with `SlotPool` - Add `SlotSharingManagerTest`, `SlotPoolSlotSharingTest` and `SlotPoolCoLocationTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) CC: @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink slotPoolSlots Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5091.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5091 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f Author: Till Rohrmann Date: 2017-11-24T17:03:49Z [FLINK-8087] Decouple Slot from AllocatedSlot This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r152985049 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException { BufferPool bufferPool = null; try { - if (gate.getConsumedPartitionType().isCreditBased()) { - // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly - bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); - gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); - } else { - int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - gate.getNumberOfInputChannels() * networkBuffersPerChannel + - extraNetworkBuffersPerGate : Integer.MAX_VALUE; - bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), - maxNumberOfMemorySegments); - } + // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly + bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); + gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); --- End diff -- What about the non-bounded partition type that we use for batch processing? Shouldn't we use an unbounded number of floating buffers there, as previously? ---
[GitHub] flink pull request #5081: [FLINK-7717][flip6] Migrate TaskManagerMetricsHand...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5081#discussion_r153243460 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Handler that returns TaskManager metrics. + * + * @see MetricStore#getTaskManagerMetricStore(String) + */ +public class TaskManagerMetricsHandler extends AbstractMetricsHandler { + + public TaskManagerMetricsHandler( + final CompletableFuture localRestAddress, + final GatewayRetriever leaderRetriever, + final Time timeout, + final Mapheaders, + final MetricFetcher metricFetcher) { + super(localRestAddress, leaderRetriever, timeout, headers, TaskManagerMetricsHeaders.getInstance(), + metricFetcher); + } + + @Nullable + @Override + protected MetricStore.ComponentMetricStore getComponentMetricStore( + final HandlerRequest request, + final MetricStore metricStore) { + final InstanceID pathParameter = request.getPathParameter(TaskManagerIdPathParameter.class); --- End diff -- This PR should be probably merged after [FLINK-8150](https://issues.apache.org/jira/browse/FLINK-8150). ---
[jira] [Commented] (FLINK-7717) Port TaskManagerMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267007#comment-16267007 ] ASF GitHub Bot commented on FLINK-7717: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5081#discussion_r153243460 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TaskManagerMetricsHandler.java --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Handler that returns TaskManager metrics. + * + * @see MetricStore#getTaskManagerMetricStore(String) + */ +public class TaskManagerMetricsHandler extends AbstractMetricsHandler { + + public TaskManagerMetricsHandler( + final CompletableFuture localRestAddress, + final GatewayRetriever leaderRetriever, + final Time timeout, + final Mapheaders, + final MetricFetcher metricFetcher) { + super(localRestAddress, leaderRetriever, timeout, headers, TaskManagerMetricsHeaders.getInstance(), + metricFetcher); + } + + @Nullable + @Override + protected MetricStore.ComponentMetricStore getComponentMetricStore( + final HandlerRequest request, + final MetricStore metricStore) { + final InstanceID pathParameter = request.getPathParameter(TaskManagerIdPathParameter.class); --- End diff -- This PR should be probably merged after [FLINK-8150](https://issues.apache.org/jira/browse/FLINK-8150). > Port TaskManagerMetricsHandler to new REST endpoint > --- > > Key: FLINK-7717 > URL: https://issues.apache.org/jira/browse/FLINK-7717 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{TaskManagerMetricsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267030#comment-16267030 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r153253247 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** +* Add the exclusive buffer into the queue, and recycle one floating buffer if the +* number of available buffers in queue is more than required amount. +* +* @param buffer The exclusive buffer of this channel. +* @return Whether to recycle one floating buffer. +*/ + boolean maintainTargetSize(Buffer buffer) { --- End diff -- Accidentally found that this was not addressed in the newest commits...can you change this? > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8089) Fulfil slot requests with unused offered slots
[ https://issues.apache.org/jira/browse/FLINK-8089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267101#comment-16267101 ] ASF GitHub Bot commented on FLINK-8089: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5090 [FLINK-8089] Also check for other pending slot requests in SlotPool#offerSlot ## What is the purpose of the change Not only check for a slot request with the right allocation id but also check whether we can fulfill other pending slot requests with an unclaimed offered slot before adding it to the list of available slots in `SlotPool`. This PR is based on #5089. ## Verifying this change - Added `SlotPoolTest#testFulfillingSlotRequestsWithUnusedOfferedSlots` to check that unused offered slots are directly used to fulfill other pending slot requests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) CC: @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixSlotOffering Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5090.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5090 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f Author: Till Rohrmann Date: 2017-11-24T17:03:49Z [FLINK-8087] Decouple Slot from AllocatedSlot This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the TaskManager without relying on the AllocatedSlot which is now only used by the SlotPool. commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942 Author: Till Rohrmann Date: 2017-11-24T17:06:10Z [FLINK-8088] Associate logical slots with the slot request id Before logical slots like the SimpleSlot and SharedSlot where associated to the actually allocated slot via the AllocationID. This, however, was sub-optimal because allocated slots can be re-used to fulfill also other slot requests (logical slots). Therefore, we should bind the logical slots to the right id with the right lifecycle which is the slot request id. commit 3e4550c0607744b20893dc90c587b63e68e4de1e Author: Till Rohrmann Date: 2017-11-13T14:42:07Z [FLINK-8089] Also check for other pending slot requests in offerSlot Not only check for a slot request with the right allocation id but also check whether we can fulfill other pending slot requests with an unclaimed offered slot before adding it to the list of available slots. commit b04dda46aaf298d921929910574662970d9c5093 Author: Till Rohrmann Date: 2017-11-24T22:29:53Z [hotfix] Speed up RecoveryITCase > Fulfil slot requests with unused offered slots > -- > > Key: FLINK-8089 > URL: https://issues.apache.org/jira/browse/FLINK-8089 > Project: Flink > Issue Type: Improvement > Components:
[GitHub] flink pull request #:
Github user zentol commented on the pull request: https://github.com/apache/flink/commit/c940d5eff9897796625a696ed2989aed52c39ebd#commitcomment-25875079 In tools/releasing/create_source_release.sh: In tools/releasing/create_source_release.sh on line 60: maybe we should have a wildcard exclude for all files/directories starting with `.`. ---
[jira] [Commented] (FLINK-7642) Upgrade maven surefire plugin to 2.19.1
[ https://issues.apache.org/jira/browse/FLINK-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267051#comment-16267051 ] Ted Yu commented on FLINK-7642: --- Over in hbase, we use 2.20.1 . SUREFIRE-1255 has been inactive - it seems 2.20.1 should not have the test hang issue. > Upgrade maven surefire plugin to 2.19.1 > --- > > Key: FLINK-7642 > URL: https://issues.apache.org/jira/browse/FLINK-7642 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu > > Surefire 2.19 release introduced more useful test filters which would let us > run a subset of the test. > This issue is for upgrading maven surefire plugin to 2.19.1 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5090 [FLINK-8089] Also check for other pending slot requests in SlotPool#offerSlot ## What is the purpose of the change Not only check for a slot request with the right allocation id but also check whether we can fulfill other pending slot requests with an unclaimed offered slot before adding it to the list of available slots in `SlotPool`. This PR is based on #5089. ## Verifying this change - Added `SlotPoolTest#testFulfillingSlotRequestsWithUnusedOfferedSlots` to check that unused offered slots are directly used to fulfill other pending slot requests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) CC: @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixSlotOffering Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5090.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5090 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. commit 84d86bebe2f9f8395430e7c71dd2393ba117b44f Author: Till Rohrmann Date: 2017-11-24T17:03:49Z [FLINK-8087] Decouple Slot from AllocatedSlot This commit introduces the SlotContext which is an abstraction for the SimpleSlot to obtain the relevant slot information to do the communication with the TaskManager without relying on the AllocatedSlot which is now only used by the SlotPool. commit 80a3cc848a0c724a2bc09b1b967cc9e6ccec5942 Author: Till Rohrmann Date: 2017-11-24T17:06:10Z [FLINK-8088] Associate logical slots with the slot request id Before logical slots like the SimpleSlot and SharedSlot where associated to the actually allocated slot via the AllocationID. This, however, was sub-optimal because allocated slots can be re-used to fulfill also other slot requests (logical slots). Therefore, we should bind the logical slots to the right id with the right lifecycle which is the slot request id. commit 3e4550c0607744b20893dc90c587b63e68e4de1e Author: Till Rohrmann Date: 2017-11-13T14:42:07Z [FLINK-8089] Also check for other pending slot requests in offerSlot Not only check for a slot request with the right allocation id but also check whether we can fulfill other pending slot requests with an unclaimed offered slot before adding it to the list of available slots. commit b04dda46aaf298d921929910574662970d9c5093 Author: Till Rohrmann Date: 2017-11-24T22:29:53Z [hotfix] Speed up RecoveryITCase ---
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266984#comment-16266984 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r152985049 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException { BufferPool bufferPool = null; try { - if (gate.getConsumedPartitionType().isCreditBased()) { - // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly - bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); - gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); - } else { - int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - gate.getNumberOfInputChannels() * networkBuffersPerChannel + - extraNetworkBuffersPerGate : Integer.MAX_VALUE; - bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), - maxNumberOfMemorySegments); - } + // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly + bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); + gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); --- End diff -- What about the non-bounded partition type that we use for batch processing? Shouldn't we use an unbounded number of floating buffers there, as previously? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8085) Thin out the LogicalSlot interface
[ https://issues.apache.org/jira/browse/FLINK-8085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267082#comment-16267082 ] ASF GitHub Bot commented on FLINK-8085: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5087 [FLINK-8085] Thin out LogicalSlot interface ## What is the purpose of the change Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. This PR is based on #5086. ## Brief change log - Remove unnecessary methods from `LogicalSlot` interface - Introduce abstraction for logical slot payload - Let `Execution` implement `LogicalSlot.Payload` ## Verifying this change This change added tests and can be verified as follows: - Added `ExecutionTest#testTerminationFutureIsCompletedAfterSlotRelease` to check that the returned termination future of the `ExecutionVertex#cancel` is completed after the assigned resource has been released. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) CC: @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink thinoutLogicalSlot Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5087.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5087 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. > Thin out the LogicalSlot interface > -- > > Key: FLINK-8085 > URL: https://issues.apache.org/jira/browse/FLINK-8085 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > The {{LogicalSlot}} interface contains method which we don't strictly need > (e.g. {{isCanceled}}, {{isReleased}}). Moreover, we should decouple the > {{LogicalSlot}} from the {{Execution}} by only setting the > {{ExecutionAttemptID}} instead of {{Execution}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5087 [FLINK-8085] Thin out LogicalSlot interface ## What is the purpose of the change Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. This PR is based on #5086. ## Brief change log - Remove unnecessary methods from `LogicalSlot` interface - Introduce abstraction for logical slot payload - Let `Execution` implement `LogicalSlot.Payload` ## Verifying this change This change added tests and can be verified as follows: - Added `ExecutionTest#testTerminationFutureIsCompletedAfterSlotRelease` to check that the returned termination future of the `ExecutionVertex#cancel` is completed after the assigned resource has been released. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) CC: @GJL You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink thinoutLogicalSlot Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5087.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5087 commit d30dde83548dbeff4249f3b57b67cdb6247af510 Author: Till RohrmannDate: 2017-11-14T22:50:52Z [FLINK-8078] Introduce LogicalSlot interface The LogicalSlot interface decouples the task deployment from the actual slot implementation which at the moment is Slot, SimpleSlot and SharedSlot. This is a helpful step to introduce a different slot implementation for Flip-6. commit e5da9566a6fc8a36ac8b06bae911c0dff5554e5d Author: Till Rohrmann Date: 2017-11-15T13:20:27Z [FLINK-8085] Thin out LogicalSlot interface Remove isCanceled, isReleased method and decouple logical slot from Execution by introducing a Payload interface which is set for a LogicalSlot. The Payload interface is implemented by the Execution and allows to fail an implementation and obtaining a termination future. Introduce proper Execution#releaseFuture which is completed once the Execution's assigned resource has been released. ---
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r153261175 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -152,6 +170,26 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof RemoteInputChannel) { + boolean triggerWrite = inputChannelsWithCredit.isEmpty(); --- End diff -- how about some small comment as in `PartitionRequestQueue`? Something like ``` // Queue an input channel for available credits announcement. // If the queue is empty, we try to trigger the actual write. // Otherwise this will be handled by the // writeAndFlushNextMessageIfPossible calls. ``` ---
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r153269318 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -283,10 +283,13 @@ public String toString() { // /** -* Enqueue this input channel in the pipeline for sending unannounced credits to producer. +* Enqueue this input channel in the pipeline for notifying the producer of unannounced credit. */ void notifyCreditAvailable() { - //TODO in next PR + // We should skip the notification if this channel is already released. + if (!isReleased.get() && partitionRequestClient != null) { --- End diff -- shouldn't we ``` checkState(partitionRequestClient != null, "Tried to send credit announcement to producer before requesting a queue.");` ``` here as well? At the moment I don't see a valid usecase for `== null` and only a potential problem with the notification not being tried again. ---
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r153258008 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -88,6 +98,15 @@ void cancelRequestFor(InputChannelID inputChannelId) { } } + void notifyCreditAvailable(final RemoteInputChannel inputChannel) { --- End diff -- Can you please add a comment under which circumstances not to call this, i.e. we must make sure, `ctx` is assigned yet (so after the channel has been activated somehow). I checked the uses of this method and those seem to be safe, i.e. in `RemoteInputChannel`s `#notifyBufferAvailable()`, `#onSenderBacklog()`, and `#recycle()`. All of these should only happen after some interaction with the channel. ---
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r153263139 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -274,4 +313,49 @@ private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.B bufferOrEvent.releaseBuffer(); } } + + private void writeAndFlushNextMessageIfPossible(Channel channel) { --- End diff -- Please add some javadoc with a hint how all `inputChannelsWithCredit` will be handled, i.e. one is written immediately, following ones after successful writes. ---
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r153266186 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -539,4 +542,60 @@ static CloseRequest readFrom(@SuppressWarnings("unused") ByteBuf buffer) throws return new CloseRequest(); } } + + static class AddCredit extends NettyMessage { --- End diff -- Please add a comment ("incremental credit announcement from the client to the server"?). ---
[GitHub] flink issue #5047: Code refine of WordWithCount
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5047 Do we still want to make this change since the PR is now only adding a single comment? ---
[jira] [Updated] (FLINK-8126) Update and fix checkstyle
[ https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-8126: -- Fix Version/s: 1.4.0 > Update and fix checkstyle > - > > Key: FLINK-8126 > URL: https://issues.apache.org/jira/browse/FLINK-8126 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.4.0, 1.5.0 > > > Our current checkstyle configuration (checkstyle version 6.19) is missing > some ImportOrder and variable naming errors which are detected in 1) IntelliJ > using the same checkstyle version and 2) with the maven-checkstyle-plugin > with an up-to-date checkstyle version (8.4). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8126) Update and fix checkstyle
[ https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267309#comment-16267309 ] ASF GitHub Bot commented on FLINK-8126: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5061 > Update and fix checkstyle > - > > Key: FLINK-8126 > URL: https://issues.apache.org/jira/browse/FLINK-8126 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.5.0 > > > Our current checkstyle configuration (checkstyle version 6.19) is missing > some ImportOrder and variable naming errors which are detected in 1) IntelliJ > using the same checkstyle version and 2) with the maven-checkstyle-plugin > with an up-to-date checkstyle version (8.4). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5061: [hotfix] [docs] Update checkstyle version in docum...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5061 ---
[jira] [Closed] (FLINK-8126) Update and fix checkstyle
[ https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-8126. - Resolution: Fixed 1.4: 4eae418b410c928b8e4b7893c1f5b9c48a5e3228 > Update and fix checkstyle > - > > Key: FLINK-8126 > URL: https://issues.apache.org/jira/browse/FLINK-8126 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.4.0, 1.5.0 > > > Our current checkstyle configuration (checkstyle version 6.19) is missing > some ImportOrder and variable naming errors which are detected in 1) IntelliJ > using the same checkstyle version and 2) with the maven-checkstyle-plugin > with an up-to-date checkstyle version (8.4). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (FLINK-8126) Update and fix checkstyle
[ https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan reopened FLINK-8126: --- > Update and fix checkstyle > - > > Key: FLINK-8126 > URL: https://issues.apache.org/jira/browse/FLINK-8126 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.5.0 > > > Our current checkstyle configuration (checkstyle version 6.19) is missing > some ImportOrder and variable naming errors which are detected in 1) IntelliJ > using the same checkstyle version and 2) with the maven-checkstyle-plugin > with an up-to-date checkstyle version (8.4). -- This message was sent by Atlassian JIRA (v6.4.14#64029)